diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
index d516d9c0150..f35f4ae0452 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
@@ -21,17 +21,13 @@
import org.apache.arrow.util.Preconditions;
-import io.netty.buffer.PooledByteBufAllocatorL;
-import io.netty.buffer.UnsafeDirectLittleEndian;
-
/**
- * Manages the relationship between one or more allocators and a particular UDLE. Ensures that
+ * The abstract base class of AllocationManager.
+ *
+ * Manages the relationship between one or more allocators and a particular UDLE. Ensures that
* one allocator owns the
* memory that multiple allocators may be referencing. Manages a BufferLedger between each of its
* associated allocators.
- * This class is also responsible for managing when memory is allocated and returned to the
- * Netty-based
- * PooledByteBufAllocatorL.
*
*
The only reason that this isn't package private is we're forced to put ArrowBuf in Netty's
* package which need access
@@ -47,22 +43,17 @@
* typical query. The
* contention of acquiring a lock on AllocationManager should be very low.
*/
-public class AllocationManager {
+public abstract class AllocationManager {
private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0);
- private static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL();
-
- static final UnsafeDirectLittleEndian EMPTY = INNER_ALLOCATOR.empty;
- static final long CHUNK_SIZE = INNER_ALLOCATOR.getChunkSize();
private final RootAllocator root;
private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet();
- private final int size;
- private final UnsafeDirectLittleEndian memoryChunk;
// ARROW-1627 Trying to minimize memory overhead caused by previously used IdentityHashMap
// see JIRA for details
private final LowCostIdentityHashMap map = new LowCostIdentityHashMap<>();
private final long amCreationTime = System.nanoTime();
+ private final int size;
// The ReferenceManager created at the time of creation of this AllocationManager
// is treated as the owning reference manager for the underlying chunk of memory
@@ -70,17 +61,16 @@ public class AllocationManager {
private volatile BufferLedger owningLedger;
private volatile long amDestructionTime = 0;
- AllocationManager(BaseAllocator accountingAllocator, int size) {
+ protected AllocationManager(BaseAllocator accountingAllocator, int size) {
Preconditions.checkNotNull(accountingAllocator);
accountingAllocator.assertOpen();
+ this.size = size;
this.root = accountingAllocator.root;
- this.memoryChunk = INNER_ALLOCATOR.allocate(size);
// we do a no retain association since our creator will want to retrieve the newly created
// ledger and will create a reference count at that point
this.owningLedger = associate(accountingAllocator, false);
- this.size = memoryChunk.capacity();
}
BufferLedger getOwningLedger() {
@@ -91,14 +81,6 @@ void setOwningLedger(final BufferLedger ledger) {
this.owningLedger = ledger;
}
- /**
- * Get the underlying memory chunk managed by this AllocationManager.
- * @return buffer
- */
- UnsafeDirectLittleEndian getMemoryChunk() {
- return memoryChunk;
- }
-
/**
* Associate the existing underlying buffer with a new allocator. This will increase the
* reference count on the corresponding buffer ledger by 1
@@ -172,10 +154,10 @@ void release(final BufferLedger ledger) {
// the only mapping was for the owner
// which now has been removed, it implies we can safely destroy the
// underlying memory chunk as it is no longer being referenced
- ((BaseAllocator)oldLedger.getAllocator()).releaseBytes(size);
+ ((BaseAllocator)oldLedger.getAllocator()).releaseBytes(getSize());
// free the memory chunk associated with the allocation manager
- memoryChunk.release();
- ((BaseAllocator)oldLedger.getAllocator()).getListener().onRelease(size);
+ release0();
+ ((BaseAllocator)oldLedger.getAllocator()).getListener().onRelease(getSize());
amDestructionTime = System.nanoTime();
owningLedger = null;
} else {
@@ -198,9 +180,37 @@ void release(final BufferLedger ledger) {
/**
* Return the size of underlying chunk of memory managed by this Allocation Manager.
- * @return size of memory chunk
+ *
+ * @return size of underlying memory chunk
*/
public int getSize() {
return size;
}
+
+ /**
+ * Return the absolute memory address pointing to the fist byte of underling memory chunk.
+ */
+ protected abstract long memoryAddress();
+
+ /**
+ * Release the underling memory chunk.
+ */
+ protected abstract void release0();
+
+ /**
+ * A factory interface for creating {@link AllocationManager}.
+ * One may extend this interface to use a user-defined AllocationManager implementation.
+ */
+ public interface Factory {
+
+ /**
+ * Create an {@link AllocationManager}.
+ *
+ * @param accountingAllocator The allocator that are expected to be associated with newly created AllocationManager.
+ * Currently it is always equivalent to "this"
+ * @param size Size (in bytes) of memory managed by the AllocationManager
+ * @return The created AllocationManager used by this allocator
+ */
+ AllocationManager create(BaseAllocator accountingAllocator, int size);
+ }
}
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
index 47fd9e93a92..efcbd546efc 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
@@ -25,13 +25,14 @@
import java.util.Map;
import java.util.Set;
+import org.apache.arrow.memory.rounding.DefaultRoundingPolicy;
import org.apache.arrow.memory.rounding.RoundingPolicy;
import org.apache.arrow.memory.util.AssertionUtil;
import org.apache.arrow.memory.util.HistoricalLog;
import org.apache.arrow.util.Preconditions;
+import org.immutables.value.Value;
import io.netty.buffer.ArrowBuf;
-import io.netty.buffer.UnsafeDirectLittleEndian;
import io.netty.util.internal.OutOfDirectMemoryError;
/**
@@ -47,6 +48,8 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled() ||
Boolean.parseBoolean(System.getProperty(DEBUG_ALLOCATOR, "false"));
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator.class);
+ public static final Config DEFAULT_CONFIG = ImmutableConfig.builder().build();
+
// Package exposed for sharing between AllocatorManger and BaseAllocator objects
final String name;
final RootAllocator root;
@@ -60,29 +63,27 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
private final IdentityHashMap childLedgers;
private final IdentityHashMap reservations;
private final HistoricalLog historicalLog;
- private volatile boolean isClosed = false; // the allocator has been closed
private final RoundingPolicy roundingPolicy;
+ private final AllocationManager.Factory allocationManagerFactory;
+
+ private volatile boolean isClosed = false; // the allocator has been closed
/**
* Initialize an allocator
* @param parentAllocator parent allocator. null if defining a root allocator
- * @param listener listener callback. Must be non-null -- use
- * {@link AllocationListener#NOOP} if no listener desired
* @param name name of this allocator
- * @param initReservation initial reservation. Cannot be modified after construction
- * @param maxAllocation limit. Allocations past the limit fail. Can be modified after
- * construction
+ * @param config configuration including other options of this allocator
+ *
+ * @see Config
*/
protected BaseAllocator(
final BaseAllocator parentAllocator,
- final AllocationListener listener,
final String name,
- final long initReservation,
- final long maxAllocation,
- final RoundingPolicy roundingPolicy) throws OutOfMemoryException {
- super(parentAllocator, name, initReservation, maxAllocation);
+ final Config config) throws OutOfMemoryException {
+ super(parentAllocator, name, config.getInitReservation(), config.getMaxAllocation());
- this.listener = listener;
+ this.listener = config.getListener();
+ this.allocationManagerFactory = config.getAllocationManagerFactory();
if (parentAllocator != null) {
this.root = parentAllocator.root;
@@ -111,7 +112,7 @@ protected BaseAllocator(
historicalLog = null;
childLedgers = null;
}
- this.roundingPolicy = roundingPolicy;
+ this.roundingPolicy = config.getRoundingPolicy();
}
AllocationListener getListener() {
@@ -119,7 +120,7 @@ AllocationListener getListener() {
}
@Override
- public BufferAllocator getParentAllocator() {
+ public BaseAllocator getParentAllocator() {
return parentAllocator;
}
@@ -280,7 +281,7 @@ public ArrowBuf buffer(final int initialRequestSize) {
}
private ArrowBuf createEmpty() {
- return new ArrowBuf(ReferenceManager.NO_OP, null, 0, AllocationManager.EMPTY.memoryAddress(), true);
+ return new ArrowBuf(ReferenceManager.NO_OP, null, 0, NettyAllocationManager.EMPTY.memoryAddress(), true);
}
@Override
@@ -344,7 +345,7 @@ private ArrowBuf bufferWithoutReservation(
BufferManager bufferManager) throws OutOfMemoryException {
assertOpen();
- final AllocationManager manager = new AllocationManager(this, size);
+ final AllocationManager manager = newAllocationManager(size);
final BufferLedger ledger = manager.associate(this); // +1 ref cnt (required)
final ArrowBuf buffer = ledger.newArrowBuf(size, bufferManager);
@@ -355,6 +356,15 @@ private ArrowBuf bufferWithoutReservation(
return buffer;
}
+ private AllocationManager newAllocationManager(int size) {
+ return newAllocationManager(this, size);
+ }
+
+
+ private AllocationManager newAllocationManager(BaseAllocator accountingAllocator, int size) {
+ return allocationManagerFactory.create(accountingAllocator, size);
+ }
+
@Override
public ArrowByteBufAllocator getAsByteBufAllocator() {
return thisAsByteBufAllocator;
@@ -377,7 +387,13 @@ public BufferAllocator newChildAllocator(
assertOpen();
final ChildAllocator childAllocator =
- new ChildAllocator(listener, this, name, initReservation, maxAllocation, roundingPolicy);
+ new ChildAllocator(this, name, configBuilder()
+ .listener(listener)
+ .initReservation(initReservation)
+ .maxAllocation(maxAllocation)
+ .roundingPolicy(roundingPolicy)
+ .allocationManagerFactory(allocationManagerFactory)
+ .build());
if (DEBUG) {
synchronized (DEBUG_LOCK) {
@@ -517,7 +533,7 @@ private void hist(String noteFormat, Object... args) {
* @throws IllegalStateException when any problems are found
*/
void verifyAllocator() {
- final IdentityHashMap seen = new IdentityHashMap<>();
+ final IdentityHashMap seen = new IdentityHashMap<>();
verifyAllocator(seen);
}
@@ -531,7 +547,7 @@ void verifyAllocator() {
* @throws IllegalStateException when any problems are found
*/
private void verifyAllocator(
- final IdentityHashMap buffersSeen) {
+ final IdentityHashMap buffersSeen) {
// The remaining tests can only be performed if we're in debug mode.
if (!DEBUG) {
return;
@@ -578,19 +594,19 @@ private void verifyAllocator(
continue;
}
- final UnsafeDirectLittleEndian udle = ledger.getUnderlying();
+ final AllocationManager am = ledger.getAllocationManager();
/*
* Even when shared, ArrowBufs are rewrapped, so we should never see the same instance
* twice.
*/
- final BaseAllocator otherOwner = buffersSeen.get(udle);
+ final BaseAllocator otherOwner = buffersSeen.get(am);
if (otherOwner != null) {
throw new IllegalStateException("This allocator's ArrowBuf already owned by another " +
"allocator");
}
- buffersSeen.put(udle, this);
+ buffersSeen.put(am, this);
- bufferTotal += udle.capacity();
+ bufferTotal += am.getSize();
}
// Preallocated space has to be accounted for
@@ -702,11 +718,11 @@ private void dumpBuffers(final StringBuilder sb, final Set ledgerS
if (!ledger.isOwningLedger()) {
continue;
}
- final UnsafeDirectLittleEndian udle = ledger.getUnderlying();
+ final AllocationManager am = ledger.getAllocationManager();
sb.append("UnsafeDirectLittleEndian[identityHashCode == ");
- sb.append(Integer.toString(System.identityHashCode(udle)));
+ sb.append(Integer.toString(System.identityHashCode(am)));
sb.append("] size ");
- sb.append(Integer.toString(udle.capacity()));
+ sb.append(Integer.toString(am.getSize()));
sb.append('\n');
}
}
@@ -729,6 +745,70 @@ public enum Verbosity {
}
}
+ /**
+ * Returns a default {@link Config} instance.
+ *
+ * @see ImmutableConfig.Builder
+ */
+ public static Config defaultConfig() {
+ return DEFAULT_CONFIG;
+
+ }
+
+ /**
+ * Returns a builder class for configuring BaseAllocator's options.
+ */
+ public static ImmutableConfig.Builder configBuilder() {
+ return ImmutableConfig.builder();
+ }
+
+ /**
+ * Config class of {@link BaseAllocator}.
+ */
+ @Value.Immutable
+ abstract static class Config {
+ /**
+ * Factory for creating {@link AllocationManager} instances.
+ */
+ @Value.Default
+ AllocationManager.Factory getAllocationManagerFactory() {
+ return NettyAllocationManager.FACTORY;
+ }
+
+ /**
+ * Listener callback. Must be non-null.
+ */
+ @Value.Default
+ AllocationListener getListener() {
+ return AllocationListener.NOOP;
+ }
+
+ /**
+ * Initial reservation size (in bytes) for this allocator.
+ */
+ @Value.Default
+ long getInitReservation() {
+ return 0;
+ }
+
+ /**
+ * Max allocation size (in bytes) for this allocator, allocations past this limit fail.
+ * Can be modified after construction.
+ */
+ @Value.Default
+ long getMaxAllocation() {
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * The policy for rounding the buffer size.
+ */
+ @Value.Default
+ RoundingPolicy getRoundingPolicy() {
+ return DefaultRoundingPolicy.INSTANCE;
+ }
+ }
+
/**
* Implementation of {@link AllocationReservation} that supports
* history tracking under {@linkplain #DEBUG} is true.
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java
index 90d74a3ca3b..4d7aa93df6b 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java
@@ -269,7 +269,7 @@ ArrowBuf newArrowBuf(final int length, final BufferManager manager) {
allocator.assertOpen();
// the start virtual address of the ArrowBuf will be same as address of memory chunk
- final long startAddress = allocationManager.getMemoryChunk().memoryAddress();
+ final long startAddress = allocationManager.memoryAddress();
// create ArrowBuf
final ArrowBuf buf = new ArrowBuf(this, manager, length, startAddress, false);
@@ -523,7 +523,41 @@ void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) {
}
}
+ /**
+ * @deprecated Use #unwrap(UnsafeDirectLittleEndian.class) instead.
+ */
+ @Deprecated
public UnsafeDirectLittleEndian getUnderlying() {
- return allocationManager.getMemoryChunk();
+ return unwrap(UnsafeDirectLittleEndian.class);
+ }
+
+ /**
+ * Get the {@link AllocationManager} used by this BufferLedger.
+ *
+ * @return The AllocationManager used by this BufferLedger.
+ */
+ public AllocationManager getAllocationManager() {
+ return allocationManager;
+ }
+
+ /**
+ * Return the {@link AllocationManager} used or underlying {@link UnsafeDirectLittleEndian} instance
+ * (in the case of we use a {@link NettyAllocationManager}), and cast to desired class.
+ *
+ * @param clazz The desired class to cast into
+ * @return The AllocationManager used by this BufferLedger, or the underlying UnsafeDirectLittleEndian object.
+ */
+ public T unwrap(Class clazz) {
+ if (clazz.isInstance(allocationManager)) {
+ return clazz.cast(allocationManager);
+ }
+
+ if (clazz == UnsafeDirectLittleEndian.class) {
+ Preconditions.checkState(allocationManager instanceof NettyAllocationManager,
+ "Underlying memory was not allocated by Netty");
+ return clazz.cast(((NettyAllocationManager) allocationManager).getMemoryChunk());
+ }
+
+ throw new IllegalArgumentException("Unexpected unwrapping class: " + clazz);
}
}
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
index d6bac3d58f1..67156f89d13 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
@@ -18,8 +18,6 @@
package org.apache.arrow.memory;
-import org.apache.arrow.memory.rounding.RoundingPolicy;
-
/**
* Child allocator class. Only slightly different from the {@see RootAllocator},
* in that these can't be created directly, but must be obtained from
@@ -33,25 +31,14 @@ class ChildAllocator extends BaseAllocator {
/**
* Constructor.
*
- * @param listener Allocation listener to be used in this child
* @param parentAllocator parent allocator -- the one creating this child
* @param name the name of this child allocator
- * @param initReservation initial amount of space to reserve (obtained from the parent)
- * @param maxAllocation maximum amount of space that can be obtained from this allocator; note
- * this includes direct allocations (via {@see BufferAllocator#buffer(int,
- *int)} et al) and requests from descendant allocators. Depending on the
- * allocation policy in force, even less memory may be available
- * @param roundingPolicy the policy for rounding requested buffer size
+ * @param config configuration of this child allocator
*/
ChildAllocator(
- AllocationListener listener,
BaseAllocator parentAllocator,
String name,
- long initReservation,
- long maxAllocation,
- RoundingPolicy roundingPolicy) {
- super(parentAllocator, listener, name, initReservation, maxAllocation, roundingPolicy);
+ Config config) {
+ super(parentAllocator, name, config);
}
-
-
}
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java
new file mode 100644
index 00000000000..6e7a1065f99
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import io.netty.buffer.PooledByteBufAllocatorL;
+import io.netty.buffer.UnsafeDirectLittleEndian;
+
+/**
+ * The default implementation of AllocationManagerBase. The implementation is responsible for managing when memory
+ * is allocated and returned to the Netty-based PooledByteBufAllocatorL.
+ */
+public class NettyAllocationManager extends AllocationManager {
+
+ public static final Factory FACTORY = new Factory();
+
+ private static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL();
+ static final UnsafeDirectLittleEndian EMPTY = INNER_ALLOCATOR.empty;
+ static final long CHUNK_SIZE = INNER_ALLOCATOR.getChunkSize();
+
+ private final int size;
+ private final UnsafeDirectLittleEndian memoryChunk;
+
+ NettyAllocationManager(BaseAllocator accountingAllocator, int size) {
+ super(accountingAllocator, size);
+ this.memoryChunk = INNER_ALLOCATOR.allocate(size);
+ this.size = memoryChunk.capacity();
+ }
+
+ /**
+ * Get the underlying memory chunk managed by this AllocationManager.
+ * @return buffer
+ */
+ UnsafeDirectLittleEndian getMemoryChunk() {
+ return memoryChunk;
+ }
+
+ @Override
+ protected long memoryAddress() {
+ return memoryChunk.memoryAddress();
+ }
+
+ @Override
+ protected void release0() {
+ memoryChunk.release();
+ }
+
+ @Override
+ public int getSize() {
+ return size;
+ }
+
+ /**
+ * Factory for creating {@link NettyAllocationManager}.
+ */
+ public static class Factory implements AllocationManager.Factory {
+ private Factory() {}
+
+ @Override
+ public AllocationManager create(BaseAllocator accountingAllocator, int size) {
+ return new NettyAllocationManager(accountingAllocator, size);
+ }
+ }
+}
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
index 02e27ab2822..5eb56d6b05d 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
@@ -40,8 +40,24 @@ public RootAllocator(final AllocationListener listener, final long limit) {
this(listener, limit, DefaultRoundingPolicy.INSTANCE);
}
+ /**
+ * Constructor.
+ *
+ * @param listener the allocation listener
+ * @param limit max allocation size in bytes
+ * @param roundingPolicy the policy for rounding the buffer size
+ */
public RootAllocator(final AllocationListener listener, final long limit, RoundingPolicy roundingPolicy) {
- super(null, listener, "ROOT", 0, limit, roundingPolicy);
+ this(configBuilder()
+ .listener(listener)
+ .maxAllocation(limit)
+ .roundingPolicy(roundingPolicy)
+ .build()
+ );
+ }
+
+ public RootAllocator(Config config) {
+ super(null, "ROOT", config);
}
/**
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java b/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java
index 7bc8393d9e4..cfaadf8b26a 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java
@@ -19,8 +19,8 @@
import java.lang.reflect.Field;
-import org.apache.arrow.memory.AllocationManager;
import org.apache.arrow.memory.BaseAllocator;
+import org.apache.arrow.memory.NettyAllocationManager;
/**
* The default rounding policy. That is, if the requested size is within the chunk size,
@@ -38,7 +38,7 @@ public class DefaultRoundingPolicy implements RoundingPolicy {
private DefaultRoundingPolicy() {
try {
- Field field = AllocationManager.class.getDeclaredField("CHUNK_SIZE");
+ Field field = NettyAllocationManager.class.getDeclaredField("CHUNK_SIZE");
field.setAccessible(true);
chunkSize = (Long) field.get(null);
} catch (Exception e) {
diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
index fd5bccd58c9..f7d120b1e87 100644
--- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
+++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
@@ -17,6 +17,7 @@
package org.apache.arrow.memory;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
@@ -25,6 +26,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -38,6 +40,7 @@
import org.junit.jupiter.api.Assertions;
import io.netty.buffer.ArrowBuf;
+import sun.misc.Unsafe;
public class TestBaseAllocator {
@@ -359,6 +362,67 @@ public void testSegmentAllocator_segmentSizeNotPowerOf2() {
assertEquals("The segment size must be a power of 2", e.getMessage());
}
+ @Test
+ public void testCustomizedAllocationManager() {
+ try (BaseAllocator allocator = createAllocatorWithCustomizedAllocationManager()) {
+ final ArrowBuf arrowBuf1 = allocator.buffer(MAX_ALLOCATION);
+ assertNotNull("allocation failed", arrowBuf1);
+
+ arrowBuf1.setInt(0, 1);
+ assertEquals(1, arrowBuf1.getInt(0));
+
+ try {
+ final ArrowBuf arrowBuf2 = allocator.buffer(1);
+ fail("allocated memory beyond max allowed");
+ } catch (OutOfMemoryException e) {
+ // expected
+ }
+ arrowBuf1.getReferenceManager().release();
+
+ try {
+ arrowBuf1.getInt(0);
+ fail("data read from released buffer");
+ } catch (RuntimeException e) {
+ // expected
+ }
+ }
+ }
+
+ private BaseAllocator createAllocatorWithCustomizedAllocationManager() {
+ return new RootAllocator(BaseAllocator.configBuilder()
+ .maxAllocation(MAX_ALLOCATION)
+ .allocationManagerFactory((accountingAllocator, size) -> new AllocationManager(accountingAllocator, size) {
+ private final Unsafe unsafe = getUnsafe();
+ private final long address = unsafe.allocateMemory(size);
+
+ @Override
+ protected long memoryAddress() {
+ return address;
+ }
+
+ @Override
+ protected void release0() {
+ unsafe.setMemory(address, size, (byte) 0);
+ unsafe.freeMemory(address);
+ }
+
+ private Unsafe getUnsafe() {
+ Field f = null;
+ try {
+ f = Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ return (Unsafe) f.get(null);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (f != null) {
+ f.setAccessible(false);
+ }
+ }
+ }
+ }).build());
+ }
+
// Allocation listener
// It counts the number of times it has been invoked, and how much memory allocation it has seen
// When set to 'expand on fail', it attempts to expand the associated allocator's limit
diff --git a/java/pom.xml b/java/pom.xml
index 9aaaefac4c8..108710018e8 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -379,6 +379,11 @@
error_prone_core
2.3.3
+
+ org.immutables
+ value
+ 2.8.2
+
@@ -558,6 +563,12 @@
javax.annotation-api
1.3.2
+
+ org.immutables
+ value
+ 2.8.2
+ provided
+