From 538a543fa35871bfc59f97b1b4536765fbe0041a Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 5 Dec 2019 17:48:57 +0800 Subject: [PATCH 01/21] ARROW-7329: [Java] AllocationManager: Allow managing different types of memory other than those are allocated using Netty --- .../arrow/memory/AllocationManager.java | 166 ++------------- .../arrow/memory/AllocationManagerBase.java | 195 ++++++++++++++++++ .../apache/arrow/memory/BaseAllocator.java | 36 ++-- .../org/apache/arrow/memory/BufferLedger.java | 11 +- .../arrow/memory/TestBaseAllocator.java | 80 ++++++- 5 files changed, 311 insertions(+), 177 deletions(-) create mode 100644 java/memory/src/main/java/org/apache/arrow/memory/AllocationManagerBase.java diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java index d516d9c0150..91cdcbdf5ac 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -17,80 +17,28 @@ package org.apache.arrow.memory; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.arrow.util.Preconditions; - import io.netty.buffer.PooledByteBufAllocatorL; import io.netty.buffer.UnsafeDirectLittleEndian; /** - * Manages the relationship between one or more allocators and a particular UDLE. Ensures that - * one allocator owns the - * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its - * associated allocators. - * This class is also responsible for managing when memory is allocated and returned to the - * Netty-based - * PooledByteBufAllocatorL. - * - *

The only reason that this isn't package private is we're forced to put ArrowBuf in Netty's - * package which need access - * to these objects or methods. - * - *

Threading: AllocationManager manages thread-safety internally. Operations within the context - * of a single BufferLedger - * are lockless in nature and can be leveraged by multiple threads. Operations that cross the - * context of two ledgers - * will acquire a lock on the AllocationManager instance. Important note, there is one - * AllocationManager per - * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a - * typical query. The - * contention of acquiring a lock on AllocationManager should be very low. + * The default implementation of AllocationManagerBase. The implementation is responsible for managing when memory + * is allocated and returned to the Netty-based PooledByteBufAllocatorL. */ -public class AllocationManager { +public class AllocationManager extends AllocationManagerBase { - private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0); private static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(); - static final UnsafeDirectLittleEndian EMPTY = INNER_ALLOCATOR.empty; static final long CHUNK_SIZE = INNER_ALLOCATOR.getChunkSize(); - private final RootAllocator root; - private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet(); private final int size; private final UnsafeDirectLittleEndian memoryChunk; - // ARROW-1627 Trying to minimize memory overhead caused by previously used IdentityHashMap - // see JIRA for details - private final LowCostIdentityHashMap map = new LowCostIdentityHashMap<>(); - private final long amCreationTime = System.nanoTime(); - - // The ReferenceManager created at the time of creation of this AllocationManager - // is treated as the owning reference manager for the underlying chunk of memory - // managed by this allocation manager - private volatile BufferLedger owningLedger; - private volatile long amDestructionTime = 0; AllocationManager(BaseAllocator accountingAllocator, int size) { - Preconditions.checkNotNull(accountingAllocator); - accountingAllocator.assertOpen(); - - this.root = accountingAllocator.root; + super(accountingAllocator); this.memoryChunk = INNER_ALLOCATOR.allocate(size); - - // we do a no retain association since our creator will want to retrieve the newly created - // ledger and will create a reference count at that point - this.owningLedger = associate(accountingAllocator, false); this.size = memoryChunk.capacity(); } - BufferLedger getOwningLedger() { - return owningLedger; - } - - void setOwningLedger(final BufferLedger ledger) { - this.owningLedger = ledger; - } - /** * Get the underlying memory chunk managed by this AllocationManager. * @return buffer @@ -99,108 +47,18 @@ UnsafeDirectLittleEndian getMemoryChunk() { return memoryChunk; } - /** - * Associate the existing underlying buffer with a new allocator. This will increase the - * reference count on the corresponding buffer ledger by 1 - * - * @param allocator The target allocator to associate this buffer with. - * @return The reference manager (new or existing) that associates the underlying - * buffer to this new ledger. - */ - BufferLedger associate(final BaseAllocator allocator) { - return associate(allocator, true); - } - - private BufferLedger associate(final BaseAllocator allocator, final boolean retain) { - allocator.assertOpen(); - Preconditions.checkState(root == allocator.root, - "A buffer can only be associated between two allocators that share the same root"); - - synchronized (this) { - BufferLedger ledger = map.get(allocator); - if (ledger != null) { - if (retain) { - // bump the ref count for the ledger - ledger.increment(); - } - return ledger; - } - - ledger = new BufferLedger(allocator, this); - - if (retain) { - // the new reference manager will have a ref count of 1 - ledger.increment(); - } - - // store the mapping for - BufferLedger oldLedger = map.put(ledger); - Preconditions.checkState(oldLedger == null, - "Detected inconsistent state: A reference manager already exists for this allocator"); - - // needed for debugging only: keep a pointer to reference manager inside allocator - // to dump state, verify allocator state etc - allocator.associateLedger(ledger); - return ledger; - } + @Override + long memoryAddress() { + return memoryChunk.memoryAddress(); } - /** - * The way that a particular ReferenceManager (BufferLedger) communicates back to the - * AllocationManager that it no longer needs to hold a reference to a particular - * piece of memory. Reference manager needs to hold a lock to invoke this method - * It is called when the shared refcount of all the ArrowBufs managed by the - * calling ReferenceManager drops to 0. - */ - void release(final BufferLedger ledger) { - final BaseAllocator allocator = (BaseAllocator)ledger.getAllocator(); - allocator.assertOpen(); - - // remove the mapping for the allocator - // of calling BufferLedger - Preconditions.checkState(map.containsKey(allocator), - "Expecting a mapping for allocator and reference manager"); - final BufferLedger oldLedger = map.remove(allocator); - - // needed for debug only: tell the allocator that AllocationManager is removing a - // reference manager associated with this particular allocator - ((BaseAllocator)oldLedger.getAllocator()).dissociateLedger(oldLedger); - - if (oldLedger == owningLedger) { - // the release call was made by the owning reference manager - if (map.isEmpty()) { - // the only mapping was for the owner - // which now has been removed, it implies we can safely destroy the - // underlying memory chunk as it is no longer being referenced - ((BaseAllocator)oldLedger.getAllocator()).releaseBytes(size); - // free the memory chunk associated with the allocation manager - memoryChunk.release(); - ((BaseAllocator)oldLedger.getAllocator()).getListener().onRelease(size); - amDestructionTime = System.nanoTime(); - owningLedger = null; - } else { - // since the refcount dropped to 0 for the owning reference manager and allocation - // manager will no longer keep a mapping for it, we need to change the owning - // reference manager to whatever the next available - // mapping exists. - BufferLedger newOwningLedger = map.getNextValue(); - // we'll forcefully transfer the ownership and not worry about whether we - // exceeded the limit since this consumer can't do anything with this. - oldLedger.transferBalance(newOwningLedger); - } - } else { - // the release call was made by a non-owning reference manager, so after remove there have - // to be 1 or more mappings - Preconditions.checkState(map.size() > 0, - "The final removal of reference manager should be connected to owning reference manager"); - } + @Override + protected void release0() { + memoryChunk.release(); } - /** - * Return the size of underlying chunk of memory managed by this Allocation Manager. - * @return size of memory chunk - */ - public int getSize() { + @Override + protected int getSize() { return size; } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManagerBase.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManagerBase.java new file mode 100644 index 00000000000..e8cf0ad7fbd --- /dev/null +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManagerBase.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.arrow.util.Preconditions; + +/** + * The abstract base class of AllocationManager. + * + *

Manages the relationship between one or more allocators and a particular UDLE. Ensures that + * one allocator owns the + * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its + * associated allocators. + * + *

The only reason that this isn't package private is we're forced to put ArrowBuf in Netty's + * package which need access + * to these objects or methods. + * + *

Threading: AllocationManager manages thread-safety internally. Operations within the context + * of a single BufferLedger + * are lockless in nature and can be leveraged by multiple threads. Operations that cross the + * context of two ledgers + * will acquire a lock on the AllocationManager instance. Important note, there is one + * AllocationManager per + * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a + * typical query. The + * contention of acquiring a lock on AllocationManager should be very low. + */ +public abstract class AllocationManagerBase { + + private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0); + + private final RootAllocator root; + private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet(); + // ARROW-1627 Trying to minimize memory overhead caused by previously used IdentityHashMap + // see JIRA for details + private final LowCostIdentityHashMap map = new LowCostIdentityHashMap<>(); + private final long amCreationTime = System.nanoTime(); + + // The ReferenceManager created at the time of creation of this AllocationManager + // is treated as the owning reference manager for the underlying chunk of memory + // managed by this allocation manager + private volatile BufferLedger owningLedger; + private volatile long amDestructionTime = 0; + + AllocationManagerBase(BaseAllocator accountingAllocator) { + Preconditions.checkNotNull(accountingAllocator); + accountingAllocator.assertOpen(); + + this.root = accountingAllocator.root; + + // we do a no retain association since our creator will want to retrieve the newly created + // ledger and will create a reference count at that point + this.owningLedger = associate(accountingAllocator, false); + } + + BufferLedger getOwningLedger() { + return owningLedger; + } + + void setOwningLedger(final BufferLedger ledger) { + this.owningLedger = ledger; + } + + /** + * Associate the existing underlying buffer with a new allocator. This will increase the + * reference count on the corresponding buffer ledger by 1 + * + * @param allocator The target allocator to associate this buffer with. + * @return The reference manager (new or existing) that associates the underlying + * buffer to this new ledger. + */ + BufferLedger associate(final BaseAllocator allocator) { + return associate(allocator, true); + } + + private BufferLedger associate(final BaseAllocator allocator, final boolean retain) { + allocator.assertOpen(); + Preconditions.checkState(root == allocator.root, + "A buffer can only be associated between two allocators that share the same root"); + + synchronized (this) { + BufferLedger ledger = map.get(allocator); + if (ledger != null) { + if (retain) { + // bump the ref count for the ledger + ledger.increment(); + } + return ledger; + } + + ledger = new BufferLedger(allocator, this); + + if (retain) { + // the new reference manager will have a ref count of 1 + ledger.increment(); + } + + // store the mapping for + BufferLedger oldLedger = map.put(ledger); + Preconditions.checkState(oldLedger == null, + "Detected inconsistent state: A reference manager already exists for this allocator"); + + // needed for debugging only: keep a pointer to reference manager inside allocator + // to dump state, verify allocator state etc + allocator.associateLedger(ledger); + return ledger; + } + } + + /** + * The way that a particular ReferenceManager (BufferLedger) communicates back to the + * AllocationManager that it no longer needs to hold a reference to a particular + * piece of memory. Reference manager needs to hold a lock to invoke this method + * It is called when the shared refcount of all the ArrowBufs managed by the + * calling ReferenceManager drops to 0. + */ + void release(final BufferLedger ledger) { + final BaseAllocator allocator = (BaseAllocator)ledger.getAllocator(); + allocator.assertOpen(); + + // remove the mapping for the allocator + // of calling BufferLedger + Preconditions.checkState(map.containsKey(allocator), + "Expecting a mapping for allocator and reference manager"); + final BufferLedger oldLedger = map.remove(allocator); + + // needed for debug only: tell the allocator that AllocationManager is removing a + // reference manager associated with this particular allocator + ((BaseAllocator)oldLedger.getAllocator()).dissociateLedger(oldLedger); + + if (oldLedger == owningLedger) { + // the release call was made by the owning reference manager + if (map.isEmpty()) { + // the only mapping was for the owner + // which now has been removed, it implies we can safely destroy the + // underlying memory chunk as it is no longer being referenced + ((BaseAllocator)oldLedger.getAllocator()).releaseBytes(getSize()); + // free the memory chunk associated with the allocation manager + release0(); + ((BaseAllocator)oldLedger.getAllocator()).getListener().onRelease(getSize()); + amDestructionTime = System.nanoTime(); + owningLedger = null; + } else { + // since the refcount dropped to 0 for the owning reference manager and allocation + // manager will no longer keep a mapping for it, we need to change the owning + // reference manager to whatever the next available + // mapping exists. + BufferLedger newOwningLedger = map.getNextValue(); + // we'll forcefully transfer the ownership and not worry about whether we + // exceeded the limit since this consumer can't do anything with this. + oldLedger.transferBalance(newOwningLedger); + } + } else { + // the release call was made by a non-owning reference manager, so after remove there have + // to be 1 or more mappings + Preconditions.checkState(map.size() > 0, + "The final removal of reference manager should be connected to owning reference manager"); + } + } + + /** + * Return the absolute memory address pointing to the fist byte of underling memory chunk. + */ + abstract long memoryAddress(); + + /** + * Release the underling memory chunk. + */ + abstract void release0(); + + /** + * Return the size of underlying chunk of memory managed by this Allocation Manager. + * + * @return size of underlying memory chunk + */ + abstract int getSize(); +} diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java index 47fd9e93a92..a3632401492 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -31,7 +31,6 @@ import org.apache.arrow.util.Preconditions; import io.netty.buffer.ArrowBuf; -import io.netty.buffer.UnsafeDirectLittleEndian; import io.netty.util.internal.OutOfDirectMemoryError; /** @@ -344,7 +343,7 @@ private ArrowBuf bufferWithoutReservation( BufferManager bufferManager) throws OutOfMemoryException { assertOpen(); - final AllocationManager manager = new AllocationManager(this, size); + final AllocationManagerBase manager = newAllocationManager(size); final BufferLedger ledger = manager.associate(this); // +1 ref cnt (required) final ArrowBuf buffer = ledger.newArrowBuf(size, bufferManager); @@ -355,6 +354,14 @@ private ArrowBuf bufferWithoutReservation( return buffer; } + private AllocationManagerBase newAllocationManager(int size) { + return newAllocationManager(this, size); + } + + protected AllocationManagerBase newAllocationManager(BaseAllocator accountingAllocator, int size) { + return new AllocationManager(accountingAllocator, size); + } + @Override public ArrowByteBufAllocator getAsByteBufAllocator() { return thisAsByteBufAllocator; @@ -377,7 +384,12 @@ public BufferAllocator newChildAllocator( assertOpen(); final ChildAllocator childAllocator = - new ChildAllocator(listener, this, name, initReservation, maxAllocation, roundingPolicy); + new ChildAllocator(listener, this, name, initReservation, maxAllocation, roundingPolicy) { + @Override + protected AllocationManagerBase newAllocationManager(BaseAllocator accountingAllocator, int size) { + return BaseAllocator.this.newAllocationManager(accountingAllocator, size); + } + }; if (DEBUG) { synchronized (DEBUG_LOCK) { @@ -517,7 +529,7 @@ private void hist(String noteFormat, Object... args) { * @throws IllegalStateException when any problems are found */ void verifyAllocator() { - final IdentityHashMap seen = new IdentityHashMap<>(); + final IdentityHashMap seen = new IdentityHashMap<>(); verifyAllocator(seen); } @@ -531,7 +543,7 @@ void verifyAllocator() { * @throws IllegalStateException when any problems are found */ private void verifyAllocator( - final IdentityHashMap buffersSeen) { + final IdentityHashMap buffersSeen) { // The remaining tests can only be performed if we're in debug mode. if (!DEBUG) { return; @@ -578,19 +590,19 @@ private void verifyAllocator( continue; } - final UnsafeDirectLittleEndian udle = ledger.getUnderlying(); + final AllocationManagerBase amb = ledger.getAllocationManager(); /* * Even when shared, ArrowBufs are rewrapped, so we should never see the same instance * twice. */ - final BaseAllocator otherOwner = buffersSeen.get(udle); + final BaseAllocator otherOwner = buffersSeen.get(amb); if (otherOwner != null) { throw new IllegalStateException("This allocator's ArrowBuf already owned by another " + "allocator"); } - buffersSeen.put(udle, this); + buffersSeen.put(amb, this); - bufferTotal += udle.capacity(); + bufferTotal += amb.getSize(); } // Preallocated space has to be accounted for @@ -702,11 +714,11 @@ private void dumpBuffers(final StringBuilder sb, final Set ledgerS if (!ledger.isOwningLedger()) { continue; } - final UnsafeDirectLittleEndian udle = ledger.getUnderlying(); + final AllocationManagerBase amb = ledger.getAllocationManager(); sb.append("UnsafeDirectLittleEndian[identityHashCode == "); - sb.append(Integer.toString(System.identityHashCode(udle))); + sb.append(Integer.toString(System.identityHashCode(amb))); sb.append("] size "); - sb.append(Integer.toString(udle.capacity())); + sb.append(Integer.toString(amb.getSize())); sb.append('\n'); } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java index 90d74a3ca3b..0ad09d4e51c 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java @@ -27,7 +27,6 @@ import org.apache.arrow.util.Preconditions; import io.netty.buffer.ArrowBuf; -import io.netty.buffer.UnsafeDirectLittleEndian; /** * The reference manager that binds an {@link AllocationManager} to @@ -46,13 +45,13 @@ public class BufferLedger implements ValueWithKeyIncluded, Refere // correctly private final long lCreationTime = System.nanoTime(); private final BaseAllocator allocator; - private final AllocationManager allocationManager; + private final AllocationManagerBase allocationManager; private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "BufferLedger[%d]", 1) : null; private volatile long lDestructionTime = 0; - BufferLedger(final BaseAllocator allocator, final AllocationManager allocationManager) { + BufferLedger(final BaseAllocator allocator, final AllocationManagerBase allocationManager) { this.allocator = allocator; this.allocationManager = allocationManager; } @@ -269,7 +268,7 @@ ArrowBuf newArrowBuf(final int length, final BufferManager manager) { allocator.assertOpen(); // the start virtual address of the ArrowBuf will be same as address of memory chunk - final long startAddress = allocationManager.getMemoryChunk().memoryAddress(); + final long startAddress = allocationManager.memoryAddress(); // create ArrowBuf final ArrowBuf buf = new ArrowBuf(this, manager, length, startAddress, false); @@ -523,7 +522,7 @@ void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) { } } - public UnsafeDirectLittleEndian getUnderlying() { - return allocationManager.getMemoryChunk(); + public AllocationManagerBase getAllocationManager() { + return allocationManager; } } diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index fd5bccd58c9..0545577d462 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -17,14 +17,12 @@ package org.apache.arrow.memory; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; + +import static org.junit.Assert.*; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -38,6 +36,7 @@ import org.junit.jupiter.api.Assertions; import io.netty.buffer.ArrowBuf; +import sun.misc.Unsafe; public class TestBaseAllocator { @@ -359,6 +358,77 @@ public void testSegmentAllocator_segmentSizeNotPowerOf2() { assertEquals("The segment size must be a power of 2", e.getMessage()); } + @Test + public void testCustomizedAllocationManager() { + try (RootAllocator allocator = createAllocatorWithCustomizedAllocationManager()) { + final ArrowBuf arrowBuf1 = allocator.buffer(MAX_ALLOCATION); + assertNotNull("allocation failed", arrowBuf1); + + arrowBuf1.setInt(0, 1); + assertEquals(1, arrowBuf1.getInt(0)); + + try { + final ArrowBuf arrowBuf2 = allocator.buffer(1); + fail("allocated memory beyond max allowed"); + } catch (OutOfMemoryException e) { + // expected + } + arrowBuf1.getReferenceManager().release(); + + try { + arrowBuf1.getInt(0); + fail("data read from released buffer"); + } catch (RuntimeException e) { + // expected + } + } + } + + private RootAllocator createAllocatorWithCustomizedAllocationManager() { + return new RootAllocator(MAX_ALLOCATION) { + + @Override + protected AllocationManagerBase newAllocationManager(BaseAllocator accountingAllocator, int size) { + + return new AllocationManagerBase(accountingAllocator) { + private final Unsafe unsafe = getUnsafe(); + private final long address = unsafe.allocateMemory(size); + + @Override + long memoryAddress() { + return address; + } + + @Override + void release0() { + unsafe.setMemory(address, size, (byte) 0); + unsafe.freeMemory(address); + } + + @Override + int getSize() { + return size; + } + + private Unsafe getUnsafe() { + Field f = null; + try { + f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe) f.get(null); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } finally { + if (f != null) { + f.setAccessible(false); + } + } + } + }; + } + }; + } + // Allocation listener // It counts the number of times it has been invoked, and how much memory allocation it has seen // When set to 'expand on fail', it attempts to expand the associated allocator's limit From 1e6e844f078259b749b385f1d8e7b6d6f94dcb5c Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 6 Dec 2019 08:42:50 +0800 Subject: [PATCH 02/21] Change class names --- .../main/java/io/netty/buffer/ArrowBuf.java | 4 +- .../arrow/memory/AllocationManager.java | 187 ++++++++++++++--- .../arrow/memory/AllocationManagerBase.java | 195 ------------------ .../apache/arrow/memory/BaseAllocator.java | 30 +-- .../org/apache/arrow/memory/BufferLedger.java | 14 +- .../arrow/memory/NettyAllocationManager.java | 64 ++++++ .../rounding/DefaultRoundingPolicy.java | 4 +- .../arrow/memory/TestBaseAllocator.java | 4 +- 8 files changed, 251 insertions(+), 251 deletions(-) delete mode 100644 java/memory/src/main/java/org/apache/arrow/memory/AllocationManagerBase.java create mode 100644 java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java index 847cd012544..daccc87afaa 100644 --- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java @@ -25,7 +25,7 @@ import java.nio.ReadOnlyBufferException; import java.util.concurrent.atomic.AtomicLong; -import org.apache.arrow.memory.AllocationManager; +import org.apache.arrow.memory.NettyAllocationManager; import org.apache.arrow.memory.BaseAllocator; import org.apache.arrow.memory.BaseAllocator.Verbosity; import org.apache.arrow.memory.BoundsChecking; @@ -54,7 +54,7 @@ * The mangement (allocation, deallocation, reference counting etc) for * the memory chunk is not done by ArrowBuf. * Default implementation of ReferenceManager, allocation is in - * {@link BaseAllocator}, {@link BufferLedger} and {@link AllocationManager} + * {@link BaseAllocator}, {@link BufferLedger} and {@link NettyAllocationManager} *

*/ public final class ArrowBuf implements AutoCloseable { diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java index 91cdcbdf5ac..b5d2daf06b2 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -17,48 +17,179 @@ package org.apache.arrow.memory; -import io.netty.buffer.PooledByteBufAllocatorL; -import io.netty.buffer.UnsafeDirectLittleEndian; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.arrow.util.Preconditions; /** - * The default implementation of AllocationManagerBase. The implementation is responsible for managing when memory - * is allocated and returned to the Netty-based PooledByteBufAllocatorL. + * The abstract base class of AllocationManager. + * + *

Manages the relationship between one or more allocators and a particular UDLE. Ensures that + * one allocator owns the + * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its + * associated allocators. + * + *

The only reason that this isn't package private is we're forced to put ArrowBuf in Netty's + * package which need access + * to these objects or methods. + * + *

Threading: AllocationManager manages thread-safety internally. Operations within the context + * of a single BufferLedger + * are lockless in nature and can be leveraged by multiple threads. Operations that cross the + * context of two ledgers + * will acquire a lock on the AllocationManager instance. Important note, there is one + * AllocationManager per + * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a + * typical query. The + * contention of acquiring a lock on AllocationManager should be very low. */ -public class AllocationManager extends AllocationManagerBase { +public abstract class AllocationManager { - private static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(); - static final UnsafeDirectLittleEndian EMPTY = INNER_ALLOCATOR.empty; - static final long CHUNK_SIZE = INNER_ALLOCATOR.getChunkSize(); + private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0); - private final int size; - private final UnsafeDirectLittleEndian memoryChunk; + private final RootAllocator root; + private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet(); + // ARROW-1627 Trying to minimize memory overhead caused by previously used IdentityHashMap + // see JIRA for details + private final LowCostIdentityHashMap map = new LowCostIdentityHashMap<>(); + private final long amCreationTime = System.nanoTime(); - AllocationManager(BaseAllocator accountingAllocator, int size) { - super(accountingAllocator); - this.memoryChunk = INNER_ALLOCATOR.allocate(size); - this.size = memoryChunk.capacity(); + // The ReferenceManager created at the time of creation of this AllocationManager + // is treated as the owning reference manager for the underlying chunk of memory + // managed by this allocation manager + private volatile BufferLedger owningLedger; + private volatile long amDestructionTime = 0; + + AllocationManager(BaseAllocator accountingAllocator) { + Preconditions.checkNotNull(accountingAllocator); + accountingAllocator.assertOpen(); + + this.root = accountingAllocator.root; + + // we do a no retain association since our creator will want to retrieve the newly created + // ledger and will create a reference count at that point + this.owningLedger = associate(accountingAllocator, false); + } + + BufferLedger getOwningLedger() { + return owningLedger; + } + + void setOwningLedger(final BufferLedger ledger) { + this.owningLedger = ledger; } /** - * Get the underlying memory chunk managed by this AllocationManager. - * @return buffer + * Associate the existing underlying buffer with a new allocator. This will increase the + * reference count on the corresponding buffer ledger by 1 + * + * @param allocator The target allocator to associate this buffer with. + * @return The reference manager (new or existing) that associates the underlying + * buffer to this new ledger. */ - UnsafeDirectLittleEndian getMemoryChunk() { - return memoryChunk; + BufferLedger associate(final BaseAllocator allocator) { + return associate(allocator, true); } - @Override - long memoryAddress() { - return memoryChunk.memoryAddress(); - } + private BufferLedger associate(final BaseAllocator allocator, final boolean retain) { + allocator.assertOpen(); + Preconditions.checkState(root == allocator.root, + "A buffer can only be associated between two allocators that share the same root"); + + synchronized (this) { + BufferLedger ledger = map.get(allocator); + if (ledger != null) { + if (retain) { + // bump the ref count for the ledger + ledger.increment(); + } + return ledger; + } + + ledger = new BufferLedger(allocator, this); + + if (retain) { + // the new reference manager will have a ref count of 1 + ledger.increment(); + } - @Override - protected void release0() { - memoryChunk.release(); + // store the mapping for + BufferLedger oldLedger = map.put(ledger); + Preconditions.checkState(oldLedger == null, + "Detected inconsistent state: A reference manager already exists for this allocator"); + + // needed for debugging only: keep a pointer to reference manager inside allocator + // to dump state, verify allocator state etc + allocator.associateLedger(ledger); + return ledger; + } } - @Override - protected int getSize() { - return size; + /** + * The way that a particular ReferenceManager (BufferLedger) communicates back to the + * AllocationManager that it no longer needs to hold a reference to a particular + * piece of memory. Reference manager needs to hold a lock to invoke this method + * It is called when the shared refcount of all the ArrowBufs managed by the + * calling ReferenceManager drops to 0. + */ + void release(final BufferLedger ledger) { + final BaseAllocator allocator = (BaseAllocator)ledger.getAllocator(); + allocator.assertOpen(); + + // remove the mapping for the allocator + // of calling BufferLedger + Preconditions.checkState(map.containsKey(allocator), + "Expecting a mapping for allocator and reference manager"); + final BufferLedger oldLedger = map.remove(allocator); + + // needed for debug only: tell the allocator that AllocationManager is removing a + // reference manager associated with this particular allocator + ((BaseAllocator)oldLedger.getAllocator()).dissociateLedger(oldLedger); + + if (oldLedger == owningLedger) { + // the release call was made by the owning reference manager + if (map.isEmpty()) { + // the only mapping was for the owner + // which now has been removed, it implies we can safely destroy the + // underlying memory chunk as it is no longer being referenced + ((BaseAllocator)oldLedger.getAllocator()).releaseBytes(getSize()); + // free the memory chunk associated with the allocation manager + release0(); + ((BaseAllocator)oldLedger.getAllocator()).getListener().onRelease(getSize()); + amDestructionTime = System.nanoTime(); + owningLedger = null; + } else { + // since the refcount dropped to 0 for the owning reference manager and allocation + // manager will no longer keep a mapping for it, we need to change the owning + // reference manager to whatever the next available + // mapping exists. + BufferLedger newOwningLedger = map.getNextValue(); + // we'll forcefully transfer the ownership and not worry about whether we + // exceeded the limit since this consumer can't do anything with this. + oldLedger.transferBalance(newOwningLedger); + } + } else { + // the release call was made by a non-owning reference manager, so after remove there have + // to be 1 or more mappings + Preconditions.checkState(map.size() > 0, + "The final removal of reference manager should be connected to owning reference manager"); + } } + + /** + * Return the absolute memory address pointing to the fist byte of underling memory chunk. + */ + abstract long memoryAddress(); + + /** + * Release the underling memory chunk. + */ + abstract void release0(); + + /** + * Return the size of underlying chunk of memory managed by this Allocation Manager. + * + * @return size of underlying memory chunk + */ + abstract int getSize(); } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManagerBase.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManagerBase.java deleted file mode 100644 index e8cf0ad7fbd..00000000000 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManagerBase.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.arrow.memory; - -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.arrow.util.Preconditions; - -/** - * The abstract base class of AllocationManager. - * - *

Manages the relationship between one or more allocators and a particular UDLE. Ensures that - * one allocator owns the - * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its - * associated allocators. - * - *

The only reason that this isn't package private is we're forced to put ArrowBuf in Netty's - * package which need access - * to these objects or methods. - * - *

Threading: AllocationManager manages thread-safety internally. Operations within the context - * of a single BufferLedger - * are lockless in nature and can be leveraged by multiple threads. Operations that cross the - * context of two ledgers - * will acquire a lock on the AllocationManager instance. Important note, there is one - * AllocationManager per - * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a - * typical query. The - * contention of acquiring a lock on AllocationManager should be very low. - */ -public abstract class AllocationManagerBase { - - private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0); - - private final RootAllocator root; - private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet(); - // ARROW-1627 Trying to minimize memory overhead caused by previously used IdentityHashMap - // see JIRA for details - private final LowCostIdentityHashMap map = new LowCostIdentityHashMap<>(); - private final long amCreationTime = System.nanoTime(); - - // The ReferenceManager created at the time of creation of this AllocationManager - // is treated as the owning reference manager for the underlying chunk of memory - // managed by this allocation manager - private volatile BufferLedger owningLedger; - private volatile long amDestructionTime = 0; - - AllocationManagerBase(BaseAllocator accountingAllocator) { - Preconditions.checkNotNull(accountingAllocator); - accountingAllocator.assertOpen(); - - this.root = accountingAllocator.root; - - // we do a no retain association since our creator will want to retrieve the newly created - // ledger and will create a reference count at that point - this.owningLedger = associate(accountingAllocator, false); - } - - BufferLedger getOwningLedger() { - return owningLedger; - } - - void setOwningLedger(final BufferLedger ledger) { - this.owningLedger = ledger; - } - - /** - * Associate the existing underlying buffer with a new allocator. This will increase the - * reference count on the corresponding buffer ledger by 1 - * - * @param allocator The target allocator to associate this buffer with. - * @return The reference manager (new or existing) that associates the underlying - * buffer to this new ledger. - */ - BufferLedger associate(final BaseAllocator allocator) { - return associate(allocator, true); - } - - private BufferLedger associate(final BaseAllocator allocator, final boolean retain) { - allocator.assertOpen(); - Preconditions.checkState(root == allocator.root, - "A buffer can only be associated between two allocators that share the same root"); - - synchronized (this) { - BufferLedger ledger = map.get(allocator); - if (ledger != null) { - if (retain) { - // bump the ref count for the ledger - ledger.increment(); - } - return ledger; - } - - ledger = new BufferLedger(allocator, this); - - if (retain) { - // the new reference manager will have a ref count of 1 - ledger.increment(); - } - - // store the mapping for - BufferLedger oldLedger = map.put(ledger); - Preconditions.checkState(oldLedger == null, - "Detected inconsistent state: A reference manager already exists for this allocator"); - - // needed for debugging only: keep a pointer to reference manager inside allocator - // to dump state, verify allocator state etc - allocator.associateLedger(ledger); - return ledger; - } - } - - /** - * The way that a particular ReferenceManager (BufferLedger) communicates back to the - * AllocationManager that it no longer needs to hold a reference to a particular - * piece of memory. Reference manager needs to hold a lock to invoke this method - * It is called when the shared refcount of all the ArrowBufs managed by the - * calling ReferenceManager drops to 0. - */ - void release(final BufferLedger ledger) { - final BaseAllocator allocator = (BaseAllocator)ledger.getAllocator(); - allocator.assertOpen(); - - // remove the mapping for the allocator - // of calling BufferLedger - Preconditions.checkState(map.containsKey(allocator), - "Expecting a mapping for allocator and reference manager"); - final BufferLedger oldLedger = map.remove(allocator); - - // needed for debug only: tell the allocator that AllocationManager is removing a - // reference manager associated with this particular allocator - ((BaseAllocator)oldLedger.getAllocator()).dissociateLedger(oldLedger); - - if (oldLedger == owningLedger) { - // the release call was made by the owning reference manager - if (map.isEmpty()) { - // the only mapping was for the owner - // which now has been removed, it implies we can safely destroy the - // underlying memory chunk as it is no longer being referenced - ((BaseAllocator)oldLedger.getAllocator()).releaseBytes(getSize()); - // free the memory chunk associated with the allocation manager - release0(); - ((BaseAllocator)oldLedger.getAllocator()).getListener().onRelease(getSize()); - amDestructionTime = System.nanoTime(); - owningLedger = null; - } else { - // since the refcount dropped to 0 for the owning reference manager and allocation - // manager will no longer keep a mapping for it, we need to change the owning - // reference manager to whatever the next available - // mapping exists. - BufferLedger newOwningLedger = map.getNextValue(); - // we'll forcefully transfer the ownership and not worry about whether we - // exceeded the limit since this consumer can't do anything with this. - oldLedger.transferBalance(newOwningLedger); - } - } else { - // the release call was made by a non-owning reference manager, so after remove there have - // to be 1 or more mappings - Preconditions.checkState(map.size() > 0, - "The final removal of reference manager should be connected to owning reference manager"); - } - } - - /** - * Return the absolute memory address pointing to the fist byte of underling memory chunk. - */ - abstract long memoryAddress(); - - /** - * Release the underling memory chunk. - */ - abstract void release0(); - - /** - * Return the size of underlying chunk of memory managed by this Allocation Manager. - * - * @return size of underlying memory chunk - */ - abstract int getSize(); -} diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java index a3632401492..1015bea678b 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -279,7 +279,7 @@ public ArrowBuf buffer(final int initialRequestSize) { } private ArrowBuf createEmpty() { - return new ArrowBuf(ReferenceManager.NO_OP, null, 0, AllocationManager.EMPTY.memoryAddress(), true); + return new ArrowBuf(ReferenceManager.NO_OP, null, 0, NettyAllocationManager.EMPTY.memoryAddress(), true); } @Override @@ -343,7 +343,7 @@ private ArrowBuf bufferWithoutReservation( BufferManager bufferManager) throws OutOfMemoryException { assertOpen(); - final AllocationManagerBase manager = newAllocationManager(size); + final AllocationManager manager = newAllocationManager(size); final BufferLedger ledger = manager.associate(this); // +1 ref cnt (required) final ArrowBuf buffer = ledger.newArrowBuf(size, bufferManager); @@ -354,12 +354,12 @@ private ArrowBuf bufferWithoutReservation( return buffer; } - private AllocationManagerBase newAllocationManager(int size) { + private AllocationManager newAllocationManager(int size) { return newAllocationManager(this, size); } - protected AllocationManagerBase newAllocationManager(BaseAllocator accountingAllocator, int size) { - return new AllocationManager(accountingAllocator, size); + protected AllocationManager newAllocationManager(BaseAllocator accountingAllocator, int size) { + return new NettyAllocationManager(accountingAllocator, size); } @Override @@ -386,7 +386,7 @@ public BufferAllocator newChildAllocator( final ChildAllocator childAllocator = new ChildAllocator(listener, this, name, initReservation, maxAllocation, roundingPolicy) { @Override - protected AllocationManagerBase newAllocationManager(BaseAllocator accountingAllocator, int size) { + protected AllocationManager newAllocationManager(BaseAllocator accountingAllocator, int size) { return BaseAllocator.this.newAllocationManager(accountingAllocator, size); } }; @@ -529,7 +529,7 @@ private void hist(String noteFormat, Object... args) { * @throws IllegalStateException when any problems are found */ void verifyAllocator() { - final IdentityHashMap seen = new IdentityHashMap<>(); + final IdentityHashMap seen = new IdentityHashMap<>(); verifyAllocator(seen); } @@ -543,7 +543,7 @@ void verifyAllocator() { * @throws IllegalStateException when any problems are found */ private void verifyAllocator( - final IdentityHashMap buffersSeen) { + final IdentityHashMap buffersSeen) { // The remaining tests can only be performed if we're in debug mode. if (!DEBUG) { return; @@ -590,19 +590,19 @@ private void verifyAllocator( continue; } - final AllocationManagerBase amb = ledger.getAllocationManager(); + final AllocationManager am = ledger.getAllocationManager(); /* * Even when shared, ArrowBufs are rewrapped, so we should never see the same instance * twice. */ - final BaseAllocator otherOwner = buffersSeen.get(amb); + final BaseAllocator otherOwner = buffersSeen.get(am); if (otherOwner != null) { throw new IllegalStateException("This allocator's ArrowBuf already owned by another " + "allocator"); } - buffersSeen.put(amb, this); + buffersSeen.put(am, this); - bufferTotal += amb.getSize(); + bufferTotal += am.getSize(); } // Preallocated space has to be accounted for @@ -714,11 +714,11 @@ private void dumpBuffers(final StringBuilder sb, final Set ledgerS if (!ledger.isOwningLedger()) { continue; } - final AllocationManagerBase amb = ledger.getAllocationManager(); + final AllocationManager am = ledger.getAllocationManager(); sb.append("UnsafeDirectLittleEndian[identityHashCode == "); - sb.append(Integer.toString(System.identityHashCode(amb))); + sb.append(Integer.toString(System.identityHashCode(am))); sb.append("] size "); - sb.append(Integer.toString(amb.getSize())); + sb.append(Integer.toString(am.getSize())); sb.append('\n'); } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java index 0ad09d4e51c..39ddd6a1a2a 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java @@ -29,7 +29,7 @@ import io.netty.buffer.ArrowBuf; /** - * The reference manager that binds an {@link AllocationManager} to + * The reference manager that binds an {@link NettyAllocationManager} to * {@link BufferAllocator} and a set of {@link ArrowBuf}. The set of * ArrowBufs managed by this reference manager share a common * fate (same reference count). @@ -45,13 +45,13 @@ public class BufferLedger implements ValueWithKeyIncluded, Refere // correctly private final long lCreationTime = System.nanoTime(); private final BaseAllocator allocator; - private final AllocationManagerBase allocationManager; + private final AllocationManager allocationManager; private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "BufferLedger[%d]", 1) : null; private volatile long lDestructionTime = 0; - BufferLedger(final BaseAllocator allocator, final AllocationManagerBase allocationManager) { + BufferLedger(final BaseAllocator allocator, final AllocationManager allocationManager) { this.allocator = allocator; this.allocationManager = allocationManager; } @@ -97,7 +97,7 @@ void increment() { * no ArrowBufs managed by this reference manager need access to the memory * chunk. In that case, the ledger should inform the allocation manager * about releasing its ownership for the chunk. Whether or not the memory - * chunk will be released is something that {@link AllocationManager} will + * chunk will be released is something that {@link NettyAllocationManager} will * decide since tracks the usage of memory chunk across multiple reference * managers and allocators. * @return true if the new ref count has dropped to 0, false otherwise @@ -113,7 +113,7 @@ public boolean release() { * no ArrowBufs managed by this reference manager need access to the memory * chunk. In that case, the ledger should inform the allocation manager * about releasing its ownership for the chunk. Whether or not the memory - * chunk will be released is something that {@link AllocationManager} will + * chunk will be released is something that {@link NettyAllocationManager} will * decide since tracks the usage of memory chunk across multiple reference * managers and allocators. * @param decrement amount to decrease the reference count by @@ -140,7 +140,7 @@ public boolean release(int decrement) { * no ArrowBufs managed by this reference manager need access to the memory * chunk. In that case, the ledger should inform the allocation manager * about releasing its ownership for the chunk. Whether or not the memory - * chunk will be released is something that {@link AllocationManager} will + * chunk will be released is something that {@link NettyAllocationManager} will * decide since tracks the usage of memory chunk across multiple reference * managers and allocators. * @@ -522,7 +522,7 @@ void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) { } } - public AllocationManagerBase getAllocationManager() { + public AllocationManager getAllocationManager() { return allocationManager; } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java new file mode 100644 index 00000000000..59454a47747 --- /dev/null +++ b/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import io.netty.buffer.PooledByteBufAllocatorL; +import io.netty.buffer.UnsafeDirectLittleEndian; + +/** + * The default implementation of AllocationManagerBase. The implementation is responsible for managing when memory + * is allocated and returned to the Netty-based PooledByteBufAllocatorL. + */ +public class NettyAllocationManager extends AllocationManager { + + private static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(); + static final UnsafeDirectLittleEndian EMPTY = INNER_ALLOCATOR.empty; + static final long CHUNK_SIZE = INNER_ALLOCATOR.getChunkSize(); + + private final int size; + private final UnsafeDirectLittleEndian memoryChunk; + + NettyAllocationManager(BaseAllocator accountingAllocator, int size) { + super(accountingAllocator); + this.memoryChunk = INNER_ALLOCATOR.allocate(size); + this.size = memoryChunk.capacity(); + } + + /** + * Get the underlying memory chunk managed by this AllocationManager. + * @return buffer + */ + UnsafeDirectLittleEndian getMemoryChunk() { + return memoryChunk; + } + + @Override + long memoryAddress() { + return memoryChunk.memoryAddress(); + } + + @Override + protected void release0() { + memoryChunk.release(); + } + + @Override + protected int getSize() { + return size; + } +} diff --git a/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java b/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java index 7bc8393d9e4..4bf5102989f 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java @@ -19,7 +19,7 @@ import java.lang.reflect.Field; -import org.apache.arrow.memory.AllocationManager; +import org.apache.arrow.memory.NettyAllocationManager; import org.apache.arrow.memory.BaseAllocator; /** @@ -38,7 +38,7 @@ public class DefaultRoundingPolicy implements RoundingPolicy { private DefaultRoundingPolicy() { try { - Field field = AllocationManager.class.getDeclaredField("CHUNK_SIZE"); + Field field = NettyAllocationManager.class.getDeclaredField("CHUNK_SIZE"); field.setAccessible(true); chunkSize = (Long) field.get(null); } catch (Exception e) { diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index 0545577d462..55e24726cc2 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -388,9 +388,9 @@ private RootAllocator createAllocatorWithCustomizedAllocationManager() { return new RootAllocator(MAX_ALLOCATION) { @Override - protected AllocationManagerBase newAllocationManager(BaseAllocator accountingAllocator, int size) { + protected AllocationManager newAllocationManager(BaseAllocator accountingAllocator, int size) { - return new AllocationManagerBase(accountingAllocator) { + return new AllocationManager(accountingAllocator) { private final Unsafe unsafe = getUnsafe(); private final long address = unsafe.allocateMemory(size); From 8a3f79d5df761a3972998d7940b6d712e5e00949 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 6 Dec 2019 08:48:17 +0800 Subject: [PATCH 03/21] Correct class names in comments --- java/memory/src/main/java/io/netty/buffer/ArrowBuf.java | 4 ++-- .../src/main/java/org/apache/arrow/memory/BufferLedger.java | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java index daccc87afaa..847cd012544 100644 --- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java @@ -25,7 +25,7 @@ import java.nio.ReadOnlyBufferException; import java.util.concurrent.atomic.AtomicLong; -import org.apache.arrow.memory.NettyAllocationManager; +import org.apache.arrow.memory.AllocationManager; import org.apache.arrow.memory.BaseAllocator; import org.apache.arrow.memory.BaseAllocator.Verbosity; import org.apache.arrow.memory.BoundsChecking; @@ -54,7 +54,7 @@ * The mangement (allocation, deallocation, reference counting etc) for * the memory chunk is not done by ArrowBuf. * Default implementation of ReferenceManager, allocation is in - * {@link BaseAllocator}, {@link BufferLedger} and {@link NettyAllocationManager} + * {@link BaseAllocator}, {@link BufferLedger} and {@link AllocationManager} *

*/ public final class ArrowBuf implements AutoCloseable { diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java index 39ddd6a1a2a..c9f3dc8b215 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java @@ -29,7 +29,7 @@ import io.netty.buffer.ArrowBuf; /** - * The reference manager that binds an {@link NettyAllocationManager} to + * The reference manager that binds an {@link AllocationManager} to * {@link BufferAllocator} and a set of {@link ArrowBuf}. The set of * ArrowBufs managed by this reference manager share a common * fate (same reference count). @@ -97,7 +97,7 @@ void increment() { * no ArrowBufs managed by this reference manager need access to the memory * chunk. In that case, the ledger should inform the allocation manager * about releasing its ownership for the chunk. Whether or not the memory - * chunk will be released is something that {@link NettyAllocationManager} will + * chunk will be released is something that {@link AllocationManager} will * decide since tracks the usage of memory chunk across multiple reference * managers and allocators. * @return true if the new ref count has dropped to 0, false otherwise @@ -113,7 +113,7 @@ public boolean release() { * no ArrowBufs managed by this reference manager need access to the memory * chunk. In that case, the ledger should inform the allocation manager * about releasing its ownership for the chunk. Whether or not the memory - * chunk will be released is something that {@link NettyAllocationManager} will + * chunk will be released is something that {@link AllocationManager} will * decide since tracks the usage of memory chunk across multiple reference * managers and allocators. * @param decrement amount to decrease the reference count by From 7db394b141344215798f4c736c587643d08cfbfe Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 6 Dec 2019 08:51:30 +0800 Subject: [PATCH 04/21] Import order --- .../org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java b/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java index 4bf5102989f..cfaadf8b26a 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java @@ -19,8 +19,8 @@ import java.lang.reflect.Field; -import org.apache.arrow.memory.NettyAllocationManager; import org.apache.arrow.memory.BaseAllocator; +import org.apache.arrow.memory.NettyAllocationManager; /** * The default rounding policy. That is, if the requested size is within the chunk size, From 7627be8f1b6ef97f290435f2a57729b5cbdea6ed Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 6 Dec 2019 08:56:40 +0800 Subject: [PATCH 05/21] A default implementation for AllocationManager::size() --- .../arrow/memory/AllocationManager.java | 20 +++++++++++-------- .../arrow/memory/NettyAllocationManager.java | 2 +- .../arrow/memory/TestBaseAllocator.java | 7 +------ 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java index b5d2daf06b2..7086c1da987 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -53,6 +53,7 @@ public abstract class AllocationManager { // see JIRA for details private final LowCostIdentityHashMap map = new LowCostIdentityHashMap<>(); private final long amCreationTime = System.nanoTime(); + private final int size; // The ReferenceManager created at the time of creation of this AllocationManager // is treated as the owning reference manager for the underlying chunk of memory @@ -60,10 +61,11 @@ public abstract class AllocationManager { private volatile BufferLedger owningLedger; private volatile long amDestructionTime = 0; - AllocationManager(BaseAllocator accountingAllocator) { + AllocationManager(BaseAllocator accountingAllocator, int size) { Preconditions.checkNotNull(accountingAllocator); accountingAllocator.assertOpen(); + this.size = size; this.root = accountingAllocator.root; // we do a no retain association since our creator will want to retrieve the newly created @@ -176,6 +178,15 @@ void release(final BufferLedger ledger) { } } + /** + * Return the size of underlying chunk of memory managed by this Allocation Manager. + * + * @return size of underlying memory chunk + */ + protected int getSize() { + return size; + } + /** * Return the absolute memory address pointing to the fist byte of underling memory chunk. */ @@ -185,11 +196,4 @@ void release(final BufferLedger ledger) { * Release the underling memory chunk. */ abstract void release0(); - - /** - * Return the size of underlying chunk of memory managed by this Allocation Manager. - * - * @return size of underlying memory chunk - */ - abstract int getSize(); } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java index 59454a47747..9d639080b79 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java @@ -34,7 +34,7 @@ public class NettyAllocationManager extends AllocationManager { private final UnsafeDirectLittleEndian memoryChunk; NettyAllocationManager(BaseAllocator accountingAllocator, int size) { - super(accountingAllocator); + super(accountingAllocator, size); this.memoryChunk = INNER_ALLOCATOR.allocate(size); this.size = memoryChunk.capacity(); } diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index 55e24726cc2..117253efff1 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -390,7 +390,7 @@ private RootAllocator createAllocatorWithCustomizedAllocationManager() { @Override protected AllocationManager newAllocationManager(BaseAllocator accountingAllocator, int size) { - return new AllocationManager(accountingAllocator) { + return new AllocationManager(accountingAllocator, size) { private final Unsafe unsafe = getUnsafe(); private final long address = unsafe.allocateMemory(size); @@ -405,11 +405,6 @@ void release0() { unsafe.freeMemory(address); } - @Override - int getSize() { - return size; - } - private Unsafe getUnsafe() { Field f = null; try { From 4ba779633b47032a809b26ecb0124a9716bffe0f Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 6 Dec 2019 09:11:18 +0800 Subject: [PATCH 06/21] Inconsistency on creating AllocationManager between child and parent --- .../java/org/apache/arrow/memory/BaseAllocator.java | 12 +++++------- .../java/org/apache/arrow/memory/ChildAllocator.java | 6 +++++- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java index 1015bea678b..ba6fd1d7300 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -118,7 +118,7 @@ AllocationListener getListener() { } @Override - public BufferAllocator getParentAllocator() { + public BaseAllocator getParentAllocator() { return parentAllocator; } @@ -358,6 +358,9 @@ private AllocationManager newAllocationManager(int size) { return newAllocationManager(this, size); } + /** + * All {@link AllocationManager} instances created are defaulted to Netty implementation. + */ protected AllocationManager newAllocationManager(BaseAllocator accountingAllocator, int size) { return new NettyAllocationManager(accountingAllocator, size); } @@ -384,12 +387,7 @@ public BufferAllocator newChildAllocator( assertOpen(); final ChildAllocator childAllocator = - new ChildAllocator(listener, this, name, initReservation, maxAllocation, roundingPolicy) { - @Override - protected AllocationManager newAllocationManager(BaseAllocator accountingAllocator, int size) { - return BaseAllocator.this.newAllocationManager(accountingAllocator, size); - } - }; + new ChildAllocator(listener, this, name, initReservation, maxAllocation, roundingPolicy); if (DEBUG) { synchronized (DEBUG_LOCK) { diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java index d6bac3d58f1..57b53f1eda3 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java @@ -53,5 +53,9 @@ class ChildAllocator extends BaseAllocator { super(parentAllocator, listener, name, initReservation, maxAllocation, roundingPolicy); } - + @Override + protected AllocationManager newAllocationManager(BaseAllocator accountingAllocator, int size) { + // to expect consistent behavior with parent on creating allocation managers + return getParentAllocator().newAllocationManager(accountingAllocator, size); + } } From a366e9943e4b4e341fbab227e657aa5f5d86de64 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 6 Dec 2019 16:58:54 +0800 Subject: [PATCH 07/21] Address review comments --- .../main/java/org/apache/arrow/memory/BaseAllocator.java | 8 +++++++- .../main/java/org/apache/arrow/memory/BufferLedger.java | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java index ba6fd1d7300..733309953a6 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -359,7 +359,13 @@ private AllocationManager newAllocationManager(int size) { } /** - * All {@link AllocationManager} instances created are defaulted to Netty implementation. + * By default all {@link ArrowBuf} instances created with {@link NettyAllocationManager}. + * One may extend this method to specify a user-defined AllocationManager implementation. + * + * @param accountingAllocator The allocator that are expected to be associated with newly created AllocationManager. + * Currently it is always equivalent to "this" + * @param size Size (in bytes) of memory managed by the AllocationManager + * @return The created AllocationManager used by this allocator */ protected AllocationManager newAllocationManager(BaseAllocator accountingAllocator, int size) { return new NettyAllocationManager(accountingAllocator, size); diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java index c9f3dc8b215..b5ec42bbaad 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java @@ -140,7 +140,7 @@ public boolean release(int decrement) { * no ArrowBufs managed by this reference manager need access to the memory * chunk. In that case, the ledger should inform the allocation manager * about releasing its ownership for the chunk. Whether or not the memory - * chunk will be released is something that {@link NettyAllocationManager} will + * chunk will be released is something that {@link AllocationManager} will * decide since tracks the usage of memory chunk across multiple reference * managers and allocators. * From f9891d56155ea0cf7ba0587e3ef90ccbe4b493f1 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 6 Dec 2019 17:00:38 +0800 Subject: [PATCH 08/21] Typo --- .../src/main/java/org/apache/arrow/memory/BaseAllocator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java index 733309953a6..b7460bd67e1 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -359,7 +359,7 @@ private AllocationManager newAllocationManager(int size) { } /** - * By default all {@link ArrowBuf} instances created with {@link NettyAllocationManager}. + * By default all {@link ArrowBuf} instances are created with {@link NettyAllocationManager}. * One may extend this method to specify a user-defined AllocationManager implementation. * * @param accountingAllocator The allocator that are expected to be associated with newly created AllocationManager. From a4f96f3b02208e88292a7131bcde7e6cede4c15b Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 6 Dec 2019 17:42:08 +0800 Subject: [PATCH 09/21] Visibility --- .../java/org/apache/arrow/memory/AllocationManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java index 7086c1da987..4b0748c2e29 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -61,7 +61,7 @@ public abstract class AllocationManager { private volatile BufferLedger owningLedger; private volatile long amDestructionTime = 0; - AllocationManager(BaseAllocator accountingAllocator, int size) { + protected AllocationManager(BaseAllocator accountingAllocator, int size) { Preconditions.checkNotNull(accountingAllocator); accountingAllocator.assertOpen(); @@ -190,10 +190,10 @@ protected int getSize() { /** * Return the absolute memory address pointing to the fist byte of underling memory chunk. */ - abstract long memoryAddress(); + protected abstract long memoryAddress(); /** * Release the underling memory chunk. */ - abstract void release0(); + protected abstract void release0(); } From 13629f4cfa83d9c2b154d7f601b345edeaa488ba Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 6 Dec 2019 17:44:47 +0800 Subject: [PATCH 10/21] Patch --- .../java/org/apache/arrow/memory/NettyAllocationManager.java | 2 +- .../test/java/org/apache/arrow/memory/TestBaseAllocator.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java index 9d639080b79..978cdfa3ff8 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java @@ -48,7 +48,7 @@ UnsafeDirectLittleEndian getMemoryChunk() { } @Override - long memoryAddress() { + protected long memoryAddress() { return memoryChunk.memoryAddress(); } diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index 117253efff1..92430e83e1c 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -395,12 +395,12 @@ protected AllocationManager newAllocationManager(BaseAllocator accountingAllocat private final long address = unsafe.allocateMemory(size); @Override - long memoryAddress() { + protected long memoryAddress() { return address; } @Override - void release0() { + protected void release0() { unsafe.setMemory(address, size, (byte) 0); unsafe.freeMemory(address); } From c049ab6e4b41f97f5109eb15a928deaea0a0696a Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 6 Dec 2019 17:51:37 +0800 Subject: [PATCH 11/21] getSize() was public, should keep the visibility --- .../main/java/org/apache/arrow/memory/AllocationManager.java | 2 +- .../java/org/apache/arrow/memory/NettyAllocationManager.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java index 4b0748c2e29..21ec6530119 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -183,7 +183,7 @@ void release(final BufferLedger ledger) { * * @return size of underlying memory chunk */ - protected int getSize() { + public int getSize() { return size; } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java index 978cdfa3ff8..8d102e43e5d 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java @@ -58,7 +58,7 @@ protected void release0() { } @Override - protected int getSize() { + public int getSize() { return size; } } From 98901aab9aefb301b8738ae43016027af311942b Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 6 Dec 2019 23:57:58 +0800 Subject: [PATCH 12/21] Use AllocationManager.Factory --- .../arrow/memory/AllocationManager.java | 17 ++ .../apache/arrow/memory/BaseAllocator.java | 190 +++++++++++++++--- .../apache/arrow/memory/ChildAllocator.java | 23 +-- .../arrow/memory/NettyAllocationManager.java | 14 ++ .../apache/arrow/memory/RootAllocator.java | 14 +- .../arrow/memory/TestBaseAllocator.java | 16 +- 6 files changed, 218 insertions(+), 56 deletions(-) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java index 21ec6530119..f35f4ae0452 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -196,4 +196,21 @@ public int getSize() { * Release the underling memory chunk. */ protected abstract void release0(); + + /** + * A factory interface for creating {@link AllocationManager}. + * One may extend this interface to use a user-defined AllocationManager implementation. + */ + public interface Factory { + + /** + * Create an {@link AllocationManager}. + * + * @param accountingAllocator The allocator that are expected to be associated with newly created AllocationManager. + * Currently it is always equivalent to "this" + * @param size Size (in bytes) of memory managed by the AllocationManager + * @return The created AllocationManager used by this allocator + */ + AllocationManager create(BaseAllocator accountingAllocator, int size); + } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java index b7460bd67e1..781734204a5 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; +import org.apache.arrow.memory.rounding.DefaultRoundingPolicy; import org.apache.arrow.memory.rounding.RoundingPolicy; import org.apache.arrow.memory.util.AssertionUtil; import org.apache.arrow.memory.util.HistoricalLog; @@ -46,6 +47,8 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled() || Boolean.parseBoolean(System.getProperty(DEBUG_ALLOCATOR, "false")); private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator.class); + public static final Config DEFAULT_CONFIG = new ConfigBuilder().create(); + // Package exposed for sharing between AllocatorManger and BaseAllocator objects final String name; final RootAllocator root; @@ -59,29 +62,28 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato private final IdentityHashMap childLedgers; private final IdentityHashMap reservations; private final HistoricalLog historicalLog; - private volatile boolean isClosed = false; // the allocator has been closed private final RoundingPolicy roundingPolicy; + private final AllocationManager.Factory allocationManagerFactory; + + private volatile boolean isClosed = false; // the allocator has been closed /** * Initialize an allocator * @param parentAllocator parent allocator. null if defining a root allocator - * @param listener listener callback. Must be non-null -- use - * {@link AllocationListener#NOOP} if no listener desired * @param name name of this allocator - * @param initReservation initial reservation. Cannot be modified after construction - * @param maxAllocation limit. Allocations past the limit fail. Can be modified after - * construction + * @param config configuration including other options of this allocator + * + * @see Config + * @see ConfigBuilder */ protected BaseAllocator( final BaseAllocator parentAllocator, - final AllocationListener listener, final String name, - final long initReservation, - final long maxAllocation, - final RoundingPolicy roundingPolicy) throws OutOfMemoryException { - super(parentAllocator, name, initReservation, maxAllocation); + final Config config) throws OutOfMemoryException { + super(parentAllocator, name, config.initReservation, config.maxAllocation); - this.listener = listener; + this.listener = config.listener; + this.allocationManagerFactory = config.allocationManagerFactory; if (parentAllocator != null) { this.root = parentAllocator.root; @@ -110,7 +112,7 @@ protected BaseAllocator( historicalLog = null; childLedgers = null; } - this.roundingPolicy = roundingPolicy; + this.roundingPolicy = config.roundingPolicy; } AllocationListener getListener() { @@ -358,17 +360,9 @@ private AllocationManager newAllocationManager(int size) { return newAllocationManager(this, size); } - /** - * By default all {@link ArrowBuf} instances are created with {@link NettyAllocationManager}. - * One may extend this method to specify a user-defined AllocationManager implementation. - * - * @param accountingAllocator The allocator that are expected to be associated with newly created AllocationManager. - * Currently it is always equivalent to "this" - * @param size Size (in bytes) of memory managed by the AllocationManager - * @return The created AllocationManager used by this allocator - */ - protected AllocationManager newAllocationManager(BaseAllocator accountingAllocator, int size) { - return new NettyAllocationManager(accountingAllocator, size); + + private AllocationManager newAllocationManager(BaseAllocator accountingAllocator, int size) { + return allocationManagerFactory.create(accountingAllocator, size); } @Override @@ -393,7 +387,13 @@ public BufferAllocator newChildAllocator( assertOpen(); final ChildAllocator childAllocator = - new ChildAllocator(listener, this, name, initReservation, maxAllocation, roundingPolicy); + new ChildAllocator(this, name, configBuilder() + .listener(listener) + .initReservation(initReservation) + .maxAllocation(maxAllocation) + .roundingPolicy(roundingPolicy) + .allocationManagerFactory(allocationManagerFactory) + .create()); if (DEBUG) { synchronized (DEBUG_LOCK) { @@ -745,6 +745,146 @@ public enum Verbosity { } } + /** + * Returns a default {@link Config} instance. + * + * @see ConfigBuilder + */ + public static Config defaultConfig() { + return DEFAULT_CONFIG; + + } + + /** + * Returns a builder class for configuring BaseAllocator's options. + */ + public static ConfigBuilder configBuilder() { + return new ConfigBuilder(); + } + + /** + * Config class of {@link BaseAllocator}. + */ + protected static class Config { + protected final AllocationManager.Factory allocationManagerFactory; + protected final AllocationListener listener; + protected final long initReservation; + protected final long maxAllocation; + protected final RoundingPolicy roundingPolicy; + + /** + * @param allocationManagerFactory factory for creating {@link AllocationManager} instances + * @param listener listener callback. Must be non-null + * @param initReservation initial reservation. Cannot be modified after construction + * @param maxAllocation limit. Allocations past the limit fail. Can be modified after + * construction + * @param roundingPolicy the policy for rounding the buffer size + */ + private Config(AllocationManager.Factory allocationManagerFactory, + AllocationListener listener, + long initReservation, + long maxAllocation, + RoundingPolicy roundingPolicy) { + this.allocationManagerFactory = allocationManagerFactory; + this.listener = listener; + this.initReservation = initReservation; + this.maxAllocation = maxAllocation; + this.roundingPolicy = roundingPolicy; + } + + private Config(ConfigBuilder builder) { + this(builder.allocationManagerFactory, + builder.listener, + builder.initReservation, + builder.maxAllocation, + builder.roundingPolicy); + } + } + + /** + * Builder class for {@link Config}. + */ + public static class ConfigBuilder { + private AllocationManager.Factory allocationManagerFactory = NettyAllocationManager.FACTORY; + private AllocationListener listener = AllocationListener.NOOP; + private long initReservation = 0; + private long maxAllocation = Long.MAX_VALUE; + private RoundingPolicy roundingPolicy = DefaultRoundingPolicy.INSTANCE; + + private ConfigBuilder() { + } + + /** + * Specify a factory for creating {@link AllocationManager} instances. + * + *

Default value: NettyAllocationManager.FACTORY + * + * @return this ConfigBuilder + * @see AllocationManager + */ + public ConfigBuilder allocationManagerFactory(AllocationManager.Factory allocationManagerFactory) { + this.allocationManagerFactory = allocationManagerFactory; + return this; + } + + /** + * Specify a listener callback. Must be non-null. + * + *

Default value: AllocationListener.NOOP + * + * @return this ConfigBuilder + * @see AllocationListener + */ + public ConfigBuilder listener(AllocationListener listener) { + this.listener = listener; + return this; + } + + /** + * Specify the initial reservation size (in bytes) for this allocator. + * + *

Default value: 0 + * + * @return this ConfigBuilder + */ + public ConfigBuilder initReservation(long initReservation) { + this.initReservation = initReservation; + return this; + } + + /** + * Specify the max allocation size (in bytes) for this allocator. + * + *

Default value: Long.MAX_VALUE + * + * @return this ConfigBuilder + */ + public ConfigBuilder maxAllocation(long maxAllocation) { + this.maxAllocation = maxAllocation; + return this; + } + + /** + * Specify a {@link RoundingPolicy} instance for rounding the buffer size + * + *

Default value: DefaultRoundingPolicy.INSTANCE + * + * @return this ConfigBuilder + * @see RoundingPolicy + */ + public ConfigBuilder roundingPolicy(RoundingPolicy roundingPolicy) { + this.roundingPolicy = roundingPolicy; + return this; + } + + /** + * Create the {@link Config} instance. + */ + public Config create() { + return new Config(this); + } + } + /** * Implementation of {@link AllocationReservation} that supports * history tracking under {@linkplain #DEBUG} is true. diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java index 57b53f1eda3..67156f89d13 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java @@ -18,8 +18,6 @@ package org.apache.arrow.memory; -import org.apache.arrow.memory.rounding.RoundingPolicy; - /** * Child allocator class. Only slightly different from the {@see RootAllocator}, * in that these can't be created directly, but must be obtained from @@ -33,29 +31,14 @@ class ChildAllocator extends BaseAllocator { /** * Constructor. * - * @param listener Allocation listener to be used in this child * @param parentAllocator parent allocator -- the one creating this child * @param name the name of this child allocator - * @param initReservation initial amount of space to reserve (obtained from the parent) - * @param maxAllocation maximum amount of space that can be obtained from this allocator; note - * this includes direct allocations (via {@see BufferAllocator#buffer(int, - *int)} et al) and requests from descendant allocators. Depending on the - * allocation policy in force, even less memory may be available - * @param roundingPolicy the policy for rounding requested buffer size + * @param config configuration of this child allocator */ ChildAllocator( - AllocationListener listener, BaseAllocator parentAllocator, String name, - long initReservation, - long maxAllocation, - RoundingPolicy roundingPolicy) { - super(parentAllocator, listener, name, initReservation, maxAllocation, roundingPolicy); - } - - @Override - protected AllocationManager newAllocationManager(BaseAllocator accountingAllocator, int size) { - // to expect consistent behavior with parent on creating allocation managers - return getParentAllocator().newAllocationManager(accountingAllocator, size); + Config config) { + super(parentAllocator, name, config); } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java index 8d102e43e5d..6e7a1065f99 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java @@ -26,6 +26,8 @@ */ public class NettyAllocationManager extends AllocationManager { + public static final Factory FACTORY = new Factory(); + private static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(); static final UnsafeDirectLittleEndian EMPTY = INNER_ALLOCATOR.empty; static final long CHUNK_SIZE = INNER_ALLOCATOR.getChunkSize(); @@ -61,4 +63,16 @@ protected void release0() { public int getSize() { return size; } + + /** + * Factory for creating {@link NettyAllocationManager}. + */ + public static class Factory implements AllocationManager.Factory { + private Factory() {} + + @Override + public AllocationManager create(BaseAllocator accountingAllocator, int size) { + return new NettyAllocationManager(accountingAllocator, size); + } + } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java index 02e27ab2822..fc24fa874e0 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java @@ -40,8 +40,20 @@ public RootAllocator(final AllocationListener listener, final long limit) { this(listener, limit, DefaultRoundingPolicy.INSTANCE); } + /** + * Constructor. + * + * @param listener the allocation listener + * @param limit max allocation size in bytes + * @param roundingPolicy the policy for rounding the buffer size + */ public RootAllocator(final AllocationListener listener, final long limit, RoundingPolicy roundingPolicy) { - super(null, listener, "ROOT", 0, limit, roundingPolicy); + super(null, "ROOT", + configBuilder() + .listener(listener) + .maxAllocation(limit) + .roundingPolicy(roundingPolicy) + .create()); } /** diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index 92430e83e1c..359d0732ba5 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -360,7 +360,7 @@ public void testSegmentAllocator_segmentSizeNotPowerOf2() { @Test public void testCustomizedAllocationManager() { - try (RootAllocator allocator = createAllocatorWithCustomizedAllocationManager()) { + try (BaseAllocator allocator = createAllocatorWithCustomizedAllocationManager()) { final ArrowBuf arrowBuf1 = allocator.buffer(MAX_ALLOCATION); assertNotNull("allocation failed", arrowBuf1); @@ -384,13 +384,9 @@ public void testCustomizedAllocationManager() { } } - private RootAllocator createAllocatorWithCustomizedAllocationManager() { - return new RootAllocator(MAX_ALLOCATION) { - - @Override - protected AllocationManager newAllocationManager(BaseAllocator accountingAllocator, int size) { - - return new AllocationManager(accountingAllocator, size) { + private BaseAllocator createAllocatorWithCustomizedAllocationManager() { + return new BaseAllocator(null, "ROOT", BaseAllocator.configBuilder() + .allocationManagerFactory((accountingAllocator, size) -> new AllocationManager(accountingAllocator, size) { private final Unsafe unsafe = getUnsafe(); private final long address = unsafe.allocateMemory(size); @@ -419,8 +415,8 @@ private Unsafe getUnsafe() { } } } - }; - } + }).create()) { + }; } From 101d529708813aa500738378c45815b3085a0b47 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Sat, 7 Dec 2019 00:12:29 +0800 Subject: [PATCH 13/21] Pass config to root allocator --- .../org/apache/arrow/memory/RootAllocator.java | 16 ++++++++++------ .../apache/arrow/memory/TestBaseAllocator.java | 6 ++---- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java index fc24fa874e0..e490e3a006f 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java @@ -48,12 +48,16 @@ public RootAllocator(final AllocationListener listener, final long limit) { * @param roundingPolicy the policy for rounding the buffer size */ public RootAllocator(final AllocationListener listener, final long limit, RoundingPolicy roundingPolicy) { - super(null, "ROOT", - configBuilder() - .listener(listener) - .maxAllocation(limit) - .roundingPolicy(roundingPolicy) - .create()); + this(configBuilder() + .listener(listener) + .maxAllocation(limit) + .roundingPolicy(roundingPolicy) + .create() + ); + } + + public RootAllocator(Config config) { + super(null, "ROOT", config); } /** diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index 359d0732ba5..59b56589d57 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -385,7 +385,7 @@ public void testCustomizedAllocationManager() { } private BaseAllocator createAllocatorWithCustomizedAllocationManager() { - return new BaseAllocator(null, "ROOT", BaseAllocator.configBuilder() + return new RootAllocator(BaseAllocator.configBuilder() .allocationManagerFactory((accountingAllocator, size) -> new AllocationManager(accountingAllocator, size) { private final Unsafe unsafe = getUnsafe(); private final long address = unsafe.allocateMemory(size); @@ -415,9 +415,7 @@ private Unsafe getUnsafe() { } } } - }).create()) { - - }; + }).create()); } // Allocation listener From 96e3e521d1a6c329b4eb2dc7a510bd578bfabd9b Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Sat, 7 Dec 2019 00:19:38 +0800 Subject: [PATCH 14/21] Oops --- .../src/test/java/org/apache/arrow/memory/TestBaseAllocator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index 59b56589d57..ca957a2780c 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -386,6 +386,7 @@ public void testCustomizedAllocationManager() { private BaseAllocator createAllocatorWithCustomizedAllocationManager() { return new RootAllocator(BaseAllocator.configBuilder() + .maxAllocation(MAX_ALLOCATION) .allocationManagerFactory((accountingAllocator, size) -> new AllocationManager(accountingAllocator, size) { private final Unsafe unsafe = getUnsafe(); private final long address = unsafe.allocateMemory(size); From 8a54af43c328713caf029050a0cb208cf15fc64d Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 11 Dec 2019 14:15:54 +0800 Subject: [PATCH 15/21] Visibility change --- .../java/org/apache/arrow/memory/AllocationManager.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java index f35f4ae0452..5bce4407b35 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -93,7 +93,14 @@ BufferLedger associate(final BaseAllocator allocator) { return associate(allocator, true); } - private BufferLedger associate(final BaseAllocator allocator, final boolean retain) { + /** + * Associate the existing underlying buffer with a new allocator. + * + * @param allocator The target allocator to associate this buffer with. + * @param retain True for increasing the ref count by 1 + * @return size of underlying memory chunk + */ + protected final BufferLedger associate(final BaseAllocator allocator, final boolean retain) { allocator.assertOpen(); Preconditions.checkState(root == allocator.root, "A buffer can only be associated between two allocators that share the same root"); From 1e7f21bb61daba8bcaeb3e64311b4fe3a25153d5 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 11 Dec 2019 14:42:27 +0800 Subject: [PATCH 16/21] Use existing test cases in TestBaseAllocator --- .../arrow/memory/TestBaseAllocator.java | 128 +++++------------- .../TestCustomizedAllocationManager.java | 96 +++++++++++++ 2 files changed, 132 insertions(+), 92 deletions(-) create mode 100644 java/memory/src/test/java/org/apache/arrow/memory/TestCustomizedAllocationManager.java diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index ca957a2780c..8897c274069 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -18,11 +18,14 @@ package org.apache.arrow.memory; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; -import java.lang.reflect.Field; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -36,7 +39,6 @@ import org.junit.jupiter.api.Assertions; import io.netty.buffer.ArrowBuf; -import sun.misc.Unsafe; public class TestBaseAllocator { @@ -63,12 +65,11 @@ public void checkBuffers() { // ---------------------------------------- DEBUG ------------------------------------ */ - - + @Test public void test_privateMax() throws Exception { try (final RootAllocator rootAllocator = - new RootAllocator(MAX_ALLOCATION)) { + createRootAllocator(MAX_ALLOCATION)) { final ArrowBuf arrowBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2); assertNotNull("allocation failed", arrowBuf1); @@ -87,7 +88,7 @@ public void test_privateMax() throws Exception { public void testRootAllocator_closeWithOutstanding() throws Exception { try { try (final RootAllocator rootAllocator = - new RootAllocator(MAX_ALLOCATION)) { + createRootAllocator(MAX_ALLOCATION)) { final ArrowBuf arrowBuf = rootAllocator.buffer(512); assertNotNull("allocation failed", arrowBuf); } @@ -109,7 +110,7 @@ public void testRootAllocator_closeWithOutstanding() throws Exception { @Test public void testRootAllocator_getEmpty() throws Exception { try (final RootAllocator rootAllocator = - new RootAllocator(MAX_ALLOCATION)) { + createRootAllocator(MAX_ALLOCATION)) { final ArrowBuf arrowBuf = rootAllocator.buffer(0); assertNotNull("allocation failed", arrowBuf); assertEquals("capacity was non-zero", 0, arrowBuf.capacity()); @@ -122,7 +123,7 @@ public void testRootAllocator_getEmpty() throws Exception { @Test(expected = IllegalStateException.class) public void testAllocator_unreleasedEmpty() throws Exception { try (final RootAllocator rootAllocator = - new RootAllocator(MAX_ALLOCATION)) { + createRootAllocator(MAX_ALLOCATION)) { @SuppressWarnings("unused") final ArrowBuf arrowBuf = rootAllocator.buffer(0); } @@ -131,7 +132,7 @@ public void testAllocator_unreleasedEmpty() throws Exception { @Test public void testAllocator_transferOwnership() throws Exception { try (final RootAllocator rootAllocator = - new RootAllocator(MAX_ALLOCATION)) { + createRootAllocator(MAX_ALLOCATION)) { final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("changeOwnership1", 0, MAX_ALLOCATION); final BufferAllocator childAllocator2 = @@ -161,7 +162,7 @@ static boolean equalsIgnoreOrder(Collection c1, Collection c2) { @Test public void testAllocator_getParentAndChild() throws Exception { - try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { assertEquals(rootAllocator.getParentAllocator(), null); try (final BufferAllocator childAllocator1 = @@ -189,7 +190,7 @@ public void testAllocator_getParentAndChild() throws Exception { @Test public void testAllocator_childRemovedOnClose() throws Exception { - try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("child1", 0, MAX_ALLOCATION)) { try (final BufferAllocator childAllocator2 = @@ -222,7 +223,7 @@ public void testAllocator_childRemovedOnClose() throws Exception { @Test public void testAllocator_shareOwnership() throws Exception { - try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("shareOwnership1", 0, MAX_ALLOCATION); final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("shareOwnership2", 0, @@ -266,7 +267,7 @@ public void testAllocator_shareOwnership() throws Exception { @Test public void testRootAllocator_createChildAndUse() throws Exception { - try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator( "createChildAndUse", 0, MAX_ALLOCATION)) { final ArrowBuf arrowBuf = childAllocator.buffer(512); @@ -279,7 +280,7 @@ public void testRootAllocator_createChildAndUse() throws Exception { @Test(expected = IllegalStateException.class) public void testRootAllocator_createChildDontClose() throws Exception { try { - try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { final BufferAllocator childAllocator = rootAllocator.newChildAllocator( "createChildDontClose", 0, MAX_ALLOCATION); final ArrowBuf arrowBuf = childAllocator.buffer(512); @@ -358,67 +359,6 @@ public void testSegmentAllocator_segmentSizeNotPowerOf2() { assertEquals("The segment size must be a power of 2", e.getMessage()); } - @Test - public void testCustomizedAllocationManager() { - try (BaseAllocator allocator = createAllocatorWithCustomizedAllocationManager()) { - final ArrowBuf arrowBuf1 = allocator.buffer(MAX_ALLOCATION); - assertNotNull("allocation failed", arrowBuf1); - - arrowBuf1.setInt(0, 1); - assertEquals(1, arrowBuf1.getInt(0)); - - try { - final ArrowBuf arrowBuf2 = allocator.buffer(1); - fail("allocated memory beyond max allowed"); - } catch (OutOfMemoryException e) { - // expected - } - arrowBuf1.getReferenceManager().release(); - - try { - arrowBuf1.getInt(0); - fail("data read from released buffer"); - } catch (RuntimeException e) { - // expected - } - } - } - - private BaseAllocator createAllocatorWithCustomizedAllocationManager() { - return new RootAllocator(BaseAllocator.configBuilder() - .maxAllocation(MAX_ALLOCATION) - .allocationManagerFactory((accountingAllocator, size) -> new AllocationManager(accountingAllocator, size) { - private final Unsafe unsafe = getUnsafe(); - private final long address = unsafe.allocateMemory(size); - - @Override - protected long memoryAddress() { - return address; - } - - @Override - protected void release0() { - unsafe.setMemory(address, size, (byte) 0); - unsafe.freeMemory(address); - } - - private Unsafe getUnsafe() { - Field f = null; - try { - f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException(e); - } finally { - if (f != null) { - f.setAccessible(false); - } - } - } - }).create()); - } - // Allocation listener // It counts the number of times it has been invoked, and how much memory allocation it has seen // When set to 'expand on fail', it attempts to expand the associated allocator's limit @@ -577,7 +517,7 @@ public void testRootAllocator_listenerAllocationFail() throws Exception { // Test attempts to allocate too much from a child whose limit is set to half of the max // allocation. The listener's callback triggers, expanding the child allocator's limit, so then // the allocation succeeds. - try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", l1, 0, MAX_ALLOCATION / 2)) { try { @@ -623,7 +563,7 @@ private static void allocateAndFree(final BufferAllocator allocator) { @Test public void testAllocator_manyAllocations() throws Exception { try (final RootAllocator rootAllocator = - new RootAllocator(MAX_ALLOCATION)) { + createRootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("manyAllocations", 0, MAX_ALLOCATION)) { allocateAndFree(childAllocator); @@ -634,7 +574,7 @@ public void testAllocator_manyAllocations() throws Exception { @Test public void testAllocator_overAllocate() throws Exception { try (final RootAllocator rootAllocator = - new RootAllocator(MAX_ALLOCATION)) { + createRootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("overAllocate", 0, MAX_ALLOCATION)) { allocateAndFree(childAllocator); @@ -652,7 +592,7 @@ public void testAllocator_overAllocate() throws Exception { @Test public void testAllocator_overAllocateParent() throws Exception { try (final RootAllocator rootAllocator = - new RootAllocator(MAX_ALLOCATION)) { + createRootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("overAllocateParent", 0, MAX_ALLOCATION)) { final ArrowBuf arrowBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2); @@ -676,7 +616,7 @@ public void testAllocator_overAllocateParent() throws Exception { @Test public void testAllocator_failureAtParentLimitOutcomeDetails() throws Exception { try (final RootAllocator rootAllocator = - new RootAllocator(MAX_ALLOCATION)) { + createRootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("child", 0, MAX_ALLOCATION / 2)) { try (final BufferAllocator grandChildAllocator = @@ -713,7 +653,7 @@ public void testAllocator_failureAtParentLimitOutcomeDetails() throws Exception @Test public void testAllocator_failureAtRootLimitOutcomeDetails() throws Exception { try (final RootAllocator rootAllocator = - new RootAllocator(MAX_ALLOCATION)) { + createRootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("child", MAX_ALLOCATION / 2, Long.MAX_VALUE)) { try (final BufferAllocator grandChildAllocator = @@ -769,7 +709,7 @@ private static void testAllocator_sliceUpBufferAndRelease( @Test public void testAllocator_createSlices() throws Exception { - try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator); try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createSlices", 0, @@ -803,7 +743,7 @@ public void testAllocator_createSlices() throws Exception { public void testAllocator_sliceRanges() throws Exception { // final AllocatorOwner allocatorOwner = new NamedOwner("sliceRanges"); try (final RootAllocator rootAllocator = - new RootAllocator(MAX_ALLOCATION)) { + createRootAllocator(MAX_ALLOCATION)) { // Populate a buffer with byte values corresponding to their indices. final ArrowBuf arrowBuf = rootAllocator.buffer(256); assertEquals(256, arrowBuf.capacity()); @@ -862,7 +802,7 @@ public void testAllocator_sliceRanges() throws Exception { public void testAllocator_slicesOfSlices() throws Exception { // final AllocatorOwner allocatorOwner = new NamedOwner("slicesOfSlices"); try (final RootAllocator rootAllocator = - new RootAllocator(MAX_ALLOCATION)) { + createRootAllocator(MAX_ALLOCATION)) { // Populate a buffer with byte values corresponding to their indices. final ArrowBuf arrowBuf = rootAllocator.buffer(256); for (int i = 0; i < 256; ++i) { @@ -896,7 +836,7 @@ public void testAllocator_slicesOfSlices() throws Exception { @Test public void testAllocator_transferSliced() throws Exception { - try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferSliced1", 0, MAX_ALLOCATION); final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferSliced2", 0, MAX_ALLOCATION); @@ -928,7 +868,7 @@ public void testAllocator_transferSliced() throws Exception { @Test public void testAllocator_shareSliced() throws Exception { - try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferSliced", 0, MAX_ALLOCATION); final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferSliced", 0, MAX_ALLOCATION); @@ -960,7 +900,7 @@ public void testAllocator_shareSliced() throws Exception { @Test public void testAllocator_transferShared() throws Exception { - try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferShared1", 0, MAX_ALLOCATION); final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferShared2", 0, MAX_ALLOCATION); final BufferAllocator childAllocator3 = rootAllocator.newChildAllocator("transferShared3", 0, MAX_ALLOCATION); @@ -1013,7 +953,7 @@ public void testAllocator_transferShared() throws Exception { @Test public void testAllocator_unclaimedReservation() throws Exception { - try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("unclaimedReservation", 0, MAX_ALLOCATION)) { try (final AllocationReservation reservation = childAllocator1.newReservation()) { @@ -1026,7 +966,7 @@ public void testAllocator_unclaimedReservation() throws Exception { @Test public void testAllocator_claimedReservation() throws Exception { - try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator( "claimedReservation", 0, MAX_ALLOCATION)) { @@ -1049,7 +989,7 @@ public void testAllocator_claimedReservation() throws Exception { @Test public void testInitReservationAndLimit() throws Exception { - try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator( "child", 2048, 4096)) { assertEquals(2048, childAllocator.getInitReservation()); @@ -1061,7 +1001,7 @@ public void testInitReservationAndLimit() throws Exception { @Test public void multiple() throws Exception { final String owner = "test"; - try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + try (RootAllocator allocator = createRootAllocator(Long.MAX_VALUE)) { final int op = 100000; @@ -1127,4 +1067,8 @@ public void assertEquiv(ArrowBuf origBuf, ArrowBuf newBuf) { assertEquals(origBuf.readerIndex(), newBuf.readerIndex()); assertEquals(origBuf.writerIndex(), newBuf.writerIndex()); } + + protected RootAllocator createRootAllocator(long maxAllocation) { + return new RootAllocator(maxAllocation); + } } diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestCustomizedAllocationManager.java b/java/memory/src/test/java/org/apache/arrow/memory/TestCustomizedAllocationManager.java new file mode 100644 index 00000000000..3f532e1b42b --- /dev/null +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestCustomizedAllocationManager.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import java.lang.reflect.Field; + +import org.junit.Test; + +import io.netty.buffer.ArrowBuf; +import sun.misc.Unsafe; + +public class TestCustomizedAllocationManager extends TestBaseAllocator { + + private static final int MAX_ALLOCATION = 8 * 1024; + + @Test + public void testCustomizedAllocationManager() { + try (final BaseAllocator allocator = createRootAllocator(MAX_ALLOCATION)) { + final ArrowBuf arrowBuf1 = allocator.buffer(MAX_ALLOCATION); + assertNotNull("allocation failed", arrowBuf1); + + arrowBuf1.setInt(0, 1); + assertEquals(1, arrowBuf1.getInt(0)); + + try { + final ArrowBuf arrowBuf2 = allocator.buffer(1); + fail("allocated memory beyond max allowed"); + } catch (OutOfMemoryException e) { + // expected + } + + arrowBuf1.getReferenceManager().release(); + try { + arrowBuf1.getInt(0); + fail("data read from released buffer"); + } catch (RuntimeException e) { + // expected + } + } + } + + @Override + protected RootAllocator createRootAllocator(long maxAllocation) { + return new RootAllocator(BaseAllocator.configBuilder() + .maxAllocation(maxAllocation) + .allocationManagerFactory((accountingAllocator, size) -> new AllocationManager(accountingAllocator, size) { + private final Unsafe unsafe = getUnsafe(); + private final long address = unsafe.allocateMemory(size); + + @Override + protected long memoryAddress() { + return address; + } + + @Override + protected void release0() { + unsafe.setMemory(address, size, (byte) 0); + unsafe.freeMemory(address); + } + + private Unsafe getUnsafe() { + Field f = null; + try { + f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe) f.get(null); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } finally { + if (f != null) { + f.setAccessible(false); + } + } + } + }).create()); + } +} From 1309a08d5a112b5e8f009c5365c195d8cf0084a6 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 11 Dec 2019 17:36:30 +0800 Subject: [PATCH 17/21] Revert "Visibility change" This reverts commit 8a54af43c328713caf029050a0cb208cf15fc64d. --- .../java/org/apache/arrow/memory/AllocationManager.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java index 5bce4407b35..f35f4ae0452 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -93,14 +93,7 @@ BufferLedger associate(final BaseAllocator allocator) { return associate(allocator, true); } - /** - * Associate the existing underlying buffer with a new allocator. - * - * @param allocator The target allocator to associate this buffer with. - * @param retain True for increasing the ref count by 1 - * @return size of underlying memory chunk - */ - protected final BufferLedger associate(final BaseAllocator allocator, final boolean retain) { + private BufferLedger associate(final BaseAllocator allocator, final boolean retain) { allocator.assertOpen(); Preconditions.checkState(root == allocator.root, "A buffer can only be associated between two allocators that share the same root"); From a38853ff09f9d5185e8536f9dada1af65ca7ba57 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 11 Dec 2019 20:57:41 +0800 Subject: [PATCH 18/21] Revert "Use existing test cases in TestBaseAllocator" This reverts commit 1e7f21bb61daba8bcaeb3e64311b4fe3a25153d5. --- .../arrow/memory/TestBaseAllocator.java | 128 +++++++++++++----- .../TestCustomizedAllocationManager.java | 96 ------------- 2 files changed, 92 insertions(+), 132 deletions(-) delete mode 100644 java/memory/src/test/java/org/apache/arrow/memory/TestCustomizedAllocationManager.java diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index 8897c274069..ca957a2780c 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -18,14 +18,11 @@ package org.apache.arrow.memory; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -39,6 +36,7 @@ import org.junit.jupiter.api.Assertions; import io.netty.buffer.ArrowBuf; +import sun.misc.Unsafe; public class TestBaseAllocator { @@ -65,11 +63,12 @@ public void checkBuffers() { // ---------------------------------------- DEBUG ------------------------------------ */ - + + @Test public void test_privateMax() throws Exception { try (final RootAllocator rootAllocator = - createRootAllocator(MAX_ALLOCATION)) { + new RootAllocator(MAX_ALLOCATION)) { final ArrowBuf arrowBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2); assertNotNull("allocation failed", arrowBuf1); @@ -88,7 +87,7 @@ public void test_privateMax() throws Exception { public void testRootAllocator_closeWithOutstanding() throws Exception { try { try (final RootAllocator rootAllocator = - createRootAllocator(MAX_ALLOCATION)) { + new RootAllocator(MAX_ALLOCATION)) { final ArrowBuf arrowBuf = rootAllocator.buffer(512); assertNotNull("allocation failed", arrowBuf); } @@ -110,7 +109,7 @@ public void testRootAllocator_closeWithOutstanding() throws Exception { @Test public void testRootAllocator_getEmpty() throws Exception { try (final RootAllocator rootAllocator = - createRootAllocator(MAX_ALLOCATION)) { + new RootAllocator(MAX_ALLOCATION)) { final ArrowBuf arrowBuf = rootAllocator.buffer(0); assertNotNull("allocation failed", arrowBuf); assertEquals("capacity was non-zero", 0, arrowBuf.capacity()); @@ -123,7 +122,7 @@ public void testRootAllocator_getEmpty() throws Exception { @Test(expected = IllegalStateException.class) public void testAllocator_unreleasedEmpty() throws Exception { try (final RootAllocator rootAllocator = - createRootAllocator(MAX_ALLOCATION)) { + new RootAllocator(MAX_ALLOCATION)) { @SuppressWarnings("unused") final ArrowBuf arrowBuf = rootAllocator.buffer(0); } @@ -132,7 +131,7 @@ public void testAllocator_unreleasedEmpty() throws Exception { @Test public void testAllocator_transferOwnership() throws Exception { try (final RootAllocator rootAllocator = - createRootAllocator(MAX_ALLOCATION)) { + new RootAllocator(MAX_ALLOCATION)) { final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("changeOwnership1", 0, MAX_ALLOCATION); final BufferAllocator childAllocator2 = @@ -162,7 +161,7 @@ static boolean equalsIgnoreOrder(Collection c1, Collection c2) { @Test public void testAllocator_getParentAndChild() throws Exception { - try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { assertEquals(rootAllocator.getParentAllocator(), null); try (final BufferAllocator childAllocator1 = @@ -190,7 +189,7 @@ public void testAllocator_getParentAndChild() throws Exception { @Test public void testAllocator_childRemovedOnClose() throws Exception { - try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("child1", 0, MAX_ALLOCATION)) { try (final BufferAllocator childAllocator2 = @@ -223,7 +222,7 @@ public void testAllocator_childRemovedOnClose() throws Exception { @Test public void testAllocator_shareOwnership() throws Exception { - try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("shareOwnership1", 0, MAX_ALLOCATION); final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("shareOwnership2", 0, @@ -267,7 +266,7 @@ public void testAllocator_shareOwnership() throws Exception { @Test public void testRootAllocator_createChildAndUse() throws Exception { - try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator( "createChildAndUse", 0, MAX_ALLOCATION)) { final ArrowBuf arrowBuf = childAllocator.buffer(512); @@ -280,7 +279,7 @@ public void testRootAllocator_createChildAndUse() throws Exception { @Test(expected = IllegalStateException.class) public void testRootAllocator_createChildDontClose() throws Exception { try { - try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { final BufferAllocator childAllocator = rootAllocator.newChildAllocator( "createChildDontClose", 0, MAX_ALLOCATION); final ArrowBuf arrowBuf = childAllocator.buffer(512); @@ -359,6 +358,67 @@ public void testSegmentAllocator_segmentSizeNotPowerOf2() { assertEquals("The segment size must be a power of 2", e.getMessage()); } + @Test + public void testCustomizedAllocationManager() { + try (BaseAllocator allocator = createAllocatorWithCustomizedAllocationManager()) { + final ArrowBuf arrowBuf1 = allocator.buffer(MAX_ALLOCATION); + assertNotNull("allocation failed", arrowBuf1); + + arrowBuf1.setInt(0, 1); + assertEquals(1, arrowBuf1.getInt(0)); + + try { + final ArrowBuf arrowBuf2 = allocator.buffer(1); + fail("allocated memory beyond max allowed"); + } catch (OutOfMemoryException e) { + // expected + } + arrowBuf1.getReferenceManager().release(); + + try { + arrowBuf1.getInt(0); + fail("data read from released buffer"); + } catch (RuntimeException e) { + // expected + } + } + } + + private BaseAllocator createAllocatorWithCustomizedAllocationManager() { + return new RootAllocator(BaseAllocator.configBuilder() + .maxAllocation(MAX_ALLOCATION) + .allocationManagerFactory((accountingAllocator, size) -> new AllocationManager(accountingAllocator, size) { + private final Unsafe unsafe = getUnsafe(); + private final long address = unsafe.allocateMemory(size); + + @Override + protected long memoryAddress() { + return address; + } + + @Override + protected void release0() { + unsafe.setMemory(address, size, (byte) 0); + unsafe.freeMemory(address); + } + + private Unsafe getUnsafe() { + Field f = null; + try { + f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe) f.get(null); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } finally { + if (f != null) { + f.setAccessible(false); + } + } + } + }).create()); + } + // Allocation listener // It counts the number of times it has been invoked, and how much memory allocation it has seen // When set to 'expand on fail', it attempts to expand the associated allocator's limit @@ -517,7 +577,7 @@ public void testRootAllocator_listenerAllocationFail() throws Exception { // Test attempts to allocate too much from a child whose limit is set to half of the max // allocation. The listener's callback triggers, expanding the child allocator's limit, so then // the allocation succeeds. - try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", l1, 0, MAX_ALLOCATION / 2)) { try { @@ -563,7 +623,7 @@ private static void allocateAndFree(final BufferAllocator allocator) { @Test public void testAllocator_manyAllocations() throws Exception { try (final RootAllocator rootAllocator = - createRootAllocator(MAX_ALLOCATION)) { + new RootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("manyAllocations", 0, MAX_ALLOCATION)) { allocateAndFree(childAllocator); @@ -574,7 +634,7 @@ public void testAllocator_manyAllocations() throws Exception { @Test public void testAllocator_overAllocate() throws Exception { try (final RootAllocator rootAllocator = - createRootAllocator(MAX_ALLOCATION)) { + new RootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("overAllocate", 0, MAX_ALLOCATION)) { allocateAndFree(childAllocator); @@ -592,7 +652,7 @@ public void testAllocator_overAllocate() throws Exception { @Test public void testAllocator_overAllocateParent() throws Exception { try (final RootAllocator rootAllocator = - createRootAllocator(MAX_ALLOCATION)) { + new RootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("overAllocateParent", 0, MAX_ALLOCATION)) { final ArrowBuf arrowBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2); @@ -616,7 +676,7 @@ public void testAllocator_overAllocateParent() throws Exception { @Test public void testAllocator_failureAtParentLimitOutcomeDetails() throws Exception { try (final RootAllocator rootAllocator = - createRootAllocator(MAX_ALLOCATION)) { + new RootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("child", 0, MAX_ALLOCATION / 2)) { try (final BufferAllocator grandChildAllocator = @@ -653,7 +713,7 @@ public void testAllocator_failureAtParentLimitOutcomeDetails() throws Exception @Test public void testAllocator_failureAtRootLimitOutcomeDetails() throws Exception { try (final RootAllocator rootAllocator = - createRootAllocator(MAX_ALLOCATION)) { + new RootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("child", MAX_ALLOCATION / 2, Long.MAX_VALUE)) { try (final BufferAllocator grandChildAllocator = @@ -709,7 +769,7 @@ private static void testAllocator_sliceUpBufferAndRelease( @Test public void testAllocator_createSlices() throws Exception { - try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator); try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createSlices", 0, @@ -743,7 +803,7 @@ public void testAllocator_createSlices() throws Exception { public void testAllocator_sliceRanges() throws Exception { // final AllocatorOwner allocatorOwner = new NamedOwner("sliceRanges"); try (final RootAllocator rootAllocator = - createRootAllocator(MAX_ALLOCATION)) { + new RootAllocator(MAX_ALLOCATION)) { // Populate a buffer with byte values corresponding to their indices. final ArrowBuf arrowBuf = rootAllocator.buffer(256); assertEquals(256, arrowBuf.capacity()); @@ -802,7 +862,7 @@ public void testAllocator_sliceRanges() throws Exception { public void testAllocator_slicesOfSlices() throws Exception { // final AllocatorOwner allocatorOwner = new NamedOwner("slicesOfSlices"); try (final RootAllocator rootAllocator = - createRootAllocator(MAX_ALLOCATION)) { + new RootAllocator(MAX_ALLOCATION)) { // Populate a buffer with byte values corresponding to their indices. final ArrowBuf arrowBuf = rootAllocator.buffer(256); for (int i = 0; i < 256; ++i) { @@ -836,7 +896,7 @@ public void testAllocator_slicesOfSlices() throws Exception { @Test public void testAllocator_transferSliced() throws Exception { - try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferSliced1", 0, MAX_ALLOCATION); final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferSliced2", 0, MAX_ALLOCATION); @@ -868,7 +928,7 @@ public void testAllocator_transferSliced() throws Exception { @Test public void testAllocator_shareSliced() throws Exception { - try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferSliced", 0, MAX_ALLOCATION); final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferSliced", 0, MAX_ALLOCATION); @@ -900,7 +960,7 @@ public void testAllocator_shareSliced() throws Exception { @Test public void testAllocator_transferShared() throws Exception { - try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferShared1", 0, MAX_ALLOCATION); final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferShared2", 0, MAX_ALLOCATION); final BufferAllocator childAllocator3 = rootAllocator.newChildAllocator("transferShared3", 0, MAX_ALLOCATION); @@ -953,7 +1013,7 @@ public void testAllocator_transferShared() throws Exception { @Test public void testAllocator_unclaimedReservation() throws Exception { - try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("unclaimedReservation", 0, MAX_ALLOCATION)) { try (final AllocationReservation reservation = childAllocator1.newReservation()) { @@ -966,7 +1026,7 @@ public void testAllocator_unclaimedReservation() throws Exception { @Test public void testAllocator_claimedReservation() throws Exception { - try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator( "claimedReservation", 0, MAX_ALLOCATION)) { @@ -989,7 +1049,7 @@ public void testAllocator_claimedReservation() throws Exception { @Test public void testInitReservationAndLimit() throws Exception { - try (final RootAllocator rootAllocator = createRootAllocator(MAX_ALLOCATION)) { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator( "child", 2048, 4096)) { assertEquals(2048, childAllocator.getInitReservation()); @@ -1001,7 +1061,7 @@ public void testInitReservationAndLimit() throws Exception { @Test public void multiple() throws Exception { final String owner = "test"; - try (RootAllocator allocator = createRootAllocator(Long.MAX_VALUE)) { + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { final int op = 100000; @@ -1067,8 +1127,4 @@ public void assertEquiv(ArrowBuf origBuf, ArrowBuf newBuf) { assertEquals(origBuf.readerIndex(), newBuf.readerIndex()); assertEquals(origBuf.writerIndex(), newBuf.writerIndex()); } - - protected RootAllocator createRootAllocator(long maxAllocation) { - return new RootAllocator(maxAllocation); - } } diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestCustomizedAllocationManager.java b/java/memory/src/test/java/org/apache/arrow/memory/TestCustomizedAllocationManager.java deleted file mode 100644 index 3f532e1b42b..00000000000 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestCustomizedAllocationManager.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.arrow.memory; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; - -import java.lang.reflect.Field; - -import org.junit.Test; - -import io.netty.buffer.ArrowBuf; -import sun.misc.Unsafe; - -public class TestCustomizedAllocationManager extends TestBaseAllocator { - - private static final int MAX_ALLOCATION = 8 * 1024; - - @Test - public void testCustomizedAllocationManager() { - try (final BaseAllocator allocator = createRootAllocator(MAX_ALLOCATION)) { - final ArrowBuf arrowBuf1 = allocator.buffer(MAX_ALLOCATION); - assertNotNull("allocation failed", arrowBuf1); - - arrowBuf1.setInt(0, 1); - assertEquals(1, arrowBuf1.getInt(0)); - - try { - final ArrowBuf arrowBuf2 = allocator.buffer(1); - fail("allocated memory beyond max allowed"); - } catch (OutOfMemoryException e) { - // expected - } - - arrowBuf1.getReferenceManager().release(); - try { - arrowBuf1.getInt(0); - fail("data read from released buffer"); - } catch (RuntimeException e) { - // expected - } - } - } - - @Override - protected RootAllocator createRootAllocator(long maxAllocation) { - return new RootAllocator(BaseAllocator.configBuilder() - .maxAllocation(maxAllocation) - .allocationManagerFactory((accountingAllocator, size) -> new AllocationManager(accountingAllocator, size) { - private final Unsafe unsafe = getUnsafe(); - private final long address = unsafe.allocateMemory(size); - - @Override - protected long memoryAddress() { - return address; - } - - @Override - protected void release0() { - unsafe.setMemory(address, size, (byte) 0); - unsafe.freeMemory(address); - } - - private Unsafe getUnsafe() { - Field f = null; - try { - f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException(e); - } finally { - if (f != null) { - f.setAccessible(false); - } - } - } - }).create()); - } -} From 214230e2ce7bbb31f105956a747ca2579df1c601 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 11 Dec 2019 21:38:55 +0800 Subject: [PATCH 19/21] Add method BufferLedger#unwrap --- .../org/apache/arrow/memory/BufferLedger.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java index b5ec42bbaad..81440d04440 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java @@ -27,6 +27,7 @@ import org.apache.arrow.util.Preconditions; import io.netty.buffer.ArrowBuf; +import io.netty.buffer.UnsafeDirectLittleEndian; /** * The reference manager that binds an {@link AllocationManager} to @@ -522,7 +523,41 @@ void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) { } } + /** + * @deprecated Use #unwrap(UnsafeDirectLittleEndian.class) instead. + */ + @Deprecated + public UnsafeDirectLittleEndian getUnderlying() { + return unwrap(UnsafeDirectLittleEndian.class); + } + + /** + * Get the {@link AllocationManager} used by this BufferLedger. + * + * @return The AllocationManager used by this BufferLedger. + */ public AllocationManager getAllocationManager() { return allocationManager; } + + /** + * Return the {@link AllocationManager} used or underlying {@link UnsafeDirectLittleEndian} instance + * (in the case of we use a {@link NettyAllocationManager}), and cast to desired class. + * + * @param clazz The desired class to cast into + * @return The AllocationManager used by this BufferLedger, or the underlying UnsafeDirectLittleEndian object. + */ + public T unwrap(Class clazz) { + if (clazz.isInstance(allocationManager)) { + return clazz.cast(allocationManager); + } + + if (clazz == UnsafeDirectLittleEndian.class) { + Preconditions.checkState(allocationManager instanceof NettyAllocationManager, + "Underlying memory was not allocated by Netty"); + return clazz.cast(((NettyAllocationManager) allocationManager).getMemoryChunk()); + } + + throw new IllegalArgumentException("Unexpected unwrapping class: " + clazz); + } } From 81b10764b7282db237c9d61f739b6c0c67032397 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 11 Dec 2019 23:23:59 +0800 Subject: [PATCH 20/21] Introduce Immutables for config builder --- java/memory/pom.xml | 4 + .../apache/arrow/memory/BaseAllocator.java | 142 ++++-------------- .../org/apache/arrow/memory/BufferLedger.java | 2 +- .../apache/arrow/memory/RootAllocator.java | 2 +- .../arrow/memory/TestBaseAllocator.java | 2 +- java/pom.xml | 11 ++ 6 files changed, 51 insertions(+), 112 deletions(-) diff --git a/java/memory/pom.xml b/java/memory/pom.xml index 4a99ec78c42..3cceb9e7cef 100644 --- a/java/memory/pom.xml +++ b/java/memory/pom.xml @@ -37,6 +37,10 @@ org.slf4j slf4j-api + + org.immutables + value + diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java index 781734204a5..efcbd546efc 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -30,6 +30,7 @@ import org.apache.arrow.memory.util.AssertionUtil; import org.apache.arrow.memory.util.HistoricalLog; import org.apache.arrow.util.Preconditions; +import org.immutables.value.Value; import io.netty.buffer.ArrowBuf; import io.netty.util.internal.OutOfDirectMemoryError; @@ -47,7 +48,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled() || Boolean.parseBoolean(System.getProperty(DEBUG_ALLOCATOR, "false")); private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator.class); - public static final Config DEFAULT_CONFIG = new ConfigBuilder().create(); + public static final Config DEFAULT_CONFIG = ImmutableConfig.builder().build(); // Package exposed for sharing between AllocatorManger and BaseAllocator objects final String name; @@ -74,16 +75,15 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato * @param config configuration including other options of this allocator * * @see Config - * @see ConfigBuilder */ protected BaseAllocator( final BaseAllocator parentAllocator, final String name, final Config config) throws OutOfMemoryException { - super(parentAllocator, name, config.initReservation, config.maxAllocation); + super(parentAllocator, name, config.getInitReservation(), config.getMaxAllocation()); - this.listener = config.listener; - this.allocationManagerFactory = config.allocationManagerFactory; + this.listener = config.getListener(); + this.allocationManagerFactory = config.getAllocationManagerFactory(); if (parentAllocator != null) { this.root = parentAllocator.root; @@ -112,7 +112,7 @@ protected BaseAllocator( historicalLog = null; childLedgers = null; } - this.roundingPolicy = config.roundingPolicy; + this.roundingPolicy = config.getRoundingPolicy(); } AllocationListener getListener() { @@ -393,7 +393,7 @@ public BufferAllocator newChildAllocator( .maxAllocation(maxAllocation) .roundingPolicy(roundingPolicy) .allocationManagerFactory(allocationManagerFactory) - .create()); + .build()); if (DEBUG) { synchronized (DEBUG_LOCK) { @@ -748,7 +748,7 @@ public enum Verbosity { /** * Returns a default {@link Config} instance. * - * @see ConfigBuilder + * @see ImmutableConfig.Builder */ public static Config defaultConfig() { return DEFAULT_CONFIG; @@ -758,130 +758,54 @@ public static Config defaultConfig() { /** * Returns a builder class for configuring BaseAllocator's options. */ - public static ConfigBuilder configBuilder() { - return new ConfigBuilder(); + public static ImmutableConfig.Builder configBuilder() { + return ImmutableConfig.builder(); } /** * Config class of {@link BaseAllocator}. */ - protected static class Config { - protected final AllocationManager.Factory allocationManagerFactory; - protected final AllocationListener listener; - protected final long initReservation; - protected final long maxAllocation; - protected final RoundingPolicy roundingPolicy; - - /** - * @param allocationManagerFactory factory for creating {@link AllocationManager} instances - * @param listener listener callback. Must be non-null - * @param initReservation initial reservation. Cannot be modified after construction - * @param maxAllocation limit. Allocations past the limit fail. Can be modified after - * construction - * @param roundingPolicy the policy for rounding the buffer size - */ - private Config(AllocationManager.Factory allocationManagerFactory, - AllocationListener listener, - long initReservation, - long maxAllocation, - RoundingPolicy roundingPolicy) { - this.allocationManagerFactory = allocationManagerFactory; - this.listener = listener; - this.initReservation = initReservation; - this.maxAllocation = maxAllocation; - this.roundingPolicy = roundingPolicy; - } - - private Config(ConfigBuilder builder) { - this(builder.allocationManagerFactory, - builder.listener, - builder.initReservation, - builder.maxAllocation, - builder.roundingPolicy); - } - } - - /** - * Builder class for {@link Config}. - */ - public static class ConfigBuilder { - private AllocationManager.Factory allocationManagerFactory = NettyAllocationManager.FACTORY; - private AllocationListener listener = AllocationListener.NOOP; - private long initReservation = 0; - private long maxAllocation = Long.MAX_VALUE; - private RoundingPolicy roundingPolicy = DefaultRoundingPolicy.INSTANCE; - - private ConfigBuilder() { - } - - /** - * Specify a factory for creating {@link AllocationManager} instances. - * - *

Default value: NettyAllocationManager.FACTORY - * - * @return this ConfigBuilder - * @see AllocationManager - */ - public ConfigBuilder allocationManagerFactory(AllocationManager.Factory allocationManagerFactory) { - this.allocationManagerFactory = allocationManagerFactory; - return this; - } - + @Value.Immutable + abstract static class Config { /** - * Specify a listener callback. Must be non-null. - * - *

Default value: AllocationListener.NOOP - * - * @return this ConfigBuilder - * @see AllocationListener + * Factory for creating {@link AllocationManager} instances. */ - public ConfigBuilder listener(AllocationListener listener) { - this.listener = listener; - return this; + @Value.Default + AllocationManager.Factory getAllocationManagerFactory() { + return NettyAllocationManager.FACTORY; } /** - * Specify the initial reservation size (in bytes) for this allocator. - * - *

Default value: 0 - * - * @return this ConfigBuilder + * Listener callback. Must be non-null. */ - public ConfigBuilder initReservation(long initReservation) { - this.initReservation = initReservation; - return this; + @Value.Default + AllocationListener getListener() { + return AllocationListener.NOOP; } /** - * Specify the max allocation size (in bytes) for this allocator. - * - *

Default value: Long.MAX_VALUE - * - * @return this ConfigBuilder + * Initial reservation size (in bytes) for this allocator. */ - public ConfigBuilder maxAllocation(long maxAllocation) { - this.maxAllocation = maxAllocation; - return this; + @Value.Default + long getInitReservation() { + return 0; } /** - * Specify a {@link RoundingPolicy} instance for rounding the buffer size - * - *

Default value: DefaultRoundingPolicy.INSTANCE - * - * @return this ConfigBuilder - * @see RoundingPolicy + * Max allocation size (in bytes) for this allocator, allocations past this limit fail. + * Can be modified after construction. */ - public ConfigBuilder roundingPolicy(RoundingPolicy roundingPolicy) { - this.roundingPolicy = roundingPolicy; - return this; + @Value.Default + long getMaxAllocation() { + return Long.MAX_VALUE; } /** - * Create the {@link Config} instance. + * The policy for rounding the buffer size. */ - public Config create() { - return new Config(this); + @Value.Default + RoundingPolicy getRoundingPolicy() { + return DefaultRoundingPolicy.INSTANCE; } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java index 81440d04440..4d7aa93df6b 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java @@ -554,7 +554,7 @@ public T unwrap(Class clazz) { if (clazz == UnsafeDirectLittleEndian.class) { Preconditions.checkState(allocationManager instanceof NettyAllocationManager, - "Underlying memory was not allocated by Netty"); + "Underlying memory was not allocated by Netty"); return clazz.cast(((NettyAllocationManager) allocationManager).getMemoryChunk()); } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java index e490e3a006f..5eb56d6b05d 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java @@ -52,7 +52,7 @@ public RootAllocator(final AllocationListener listener, final long limit, Roundi .listener(listener) .maxAllocation(limit) .roundingPolicy(roundingPolicy) - .create() + .build() ); } diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index ca957a2780c..018d67f45b7 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -416,7 +416,7 @@ private Unsafe getUnsafe() { } } } - }).create()); + }).build()); } // Allocation listener diff --git a/java/pom.xml b/java/pom.xml index 9aaaefac4c8..108710018e8 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -379,6 +379,11 @@ error_prone_core 2.3.3 + + org.immutables + value + 2.8.2 + @@ -558,6 +563,12 @@ javax.annotation-api 1.3.2 + + org.immutables + value + 2.8.2 + provided + From c0345c0af39811fcfa8905280da8f7113aa7d5be Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 9 Jan 2020 16:43:10 +0800 Subject: [PATCH 21/21] Expand star import --- .../java/org/apache/arrow/memory/TestBaseAllocator.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index 018d67f45b7..f7d120b1e87 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -18,7 +18,11 @@ package org.apache.arrow.memory; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows;