Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void updatePeak() {
* @param size to increase
* @return Whether the allocation fit within limits.
*/
boolean forceAllocate(long size) {
public boolean forceAllocate(long size) {
final AllocationOutcome.Status outcome = allocate(size, true, true, null);
return outcome.isOk();
}
Expand Down Expand Up @@ -220,7 +220,6 @@ public void releaseBytes(long size) {
final long actualToReleaseToParent = Math.min(size, possibleAmountToReleaseToParent);
parent.releaseBytes(actualToReleaseToParent);
}

}

public boolean isOverLimit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ public abstract class AllocationManager {

private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0);

private final RootAllocator root;
private final BufferAllocator root;
private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet();
// ARROW-1627 Trying to minimize memory overhead caused by previously used IdentityHashMap
// see JIRA for details
private final LowCostIdentityHashMap<BaseAllocator, BufferLedger> map = new LowCostIdentityHashMap<>();
private final LowCostIdentityHashMap<BufferAllocator, BufferLedger> map = new LowCostIdentityHashMap<>();
private final long amCreationTime = System.nanoTime();

// The ReferenceManager created at the time of creation of this AllocationManager
Expand All @@ -60,11 +60,11 @@ public abstract class AllocationManager {
private volatile BufferLedger owningLedger;
private volatile long amDestructionTime = 0;

protected AllocationManager(BaseAllocator accountingAllocator) {
protected AllocationManager(BufferAllocator accountingAllocator) {
Preconditions.checkNotNull(accountingAllocator);
accountingAllocator.assertOpen();

this.root = accountingAllocator.root;
this.root = accountingAllocator.getRoot();

// we do a no retain association since our creator will want to retrieve the newly created
// ledger and will create a reference count at that point
Expand All @@ -87,13 +87,13 @@ void setOwningLedger(final BufferLedger ledger) {
* @return The reference manager (new or existing) that associates the underlying
* buffer to this new ledger.
*/
BufferLedger associate(final BaseAllocator allocator) {
BufferLedger associate(final BufferAllocator allocator) {
return associate(allocator, true);
}

private BufferLedger associate(final BaseAllocator allocator, final boolean retain) {
private BufferLedger associate(final BufferAllocator allocator, final boolean retain) {
allocator.assertOpen();
Preconditions.checkState(root == allocator.root,
Preconditions.checkState(root == allocator.getRoot(),
"A buffer can only be associated between two allocators that share the same root");

synchronized (this) {
Expand All @@ -118,9 +118,11 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
Preconditions.checkState(oldLedger == null,
"Detected inconsistent state: A reference manager already exists for this allocator");

// needed for debugging only: keep a pointer to reference manager inside allocator
// to dump state, verify allocator state etc
allocator.associateLedger(ledger);
if (allocator instanceof BaseAllocator) {
// needed for debugging only: keep a pointer to reference manager inside allocator
// to dump state, verify allocator state etc
((BaseAllocator) allocator).associateLedger(ledger);
}
return ledger;
}
}
Expand All @@ -133,7 +135,7 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
* calling ReferenceManager drops to 0.
*/
void release(final BufferLedger ledger) {
final BaseAllocator allocator = (BaseAllocator) ledger.getAllocator();
final BufferAllocator allocator = ledger.getAllocator();
allocator.assertOpen();

// remove the <BaseAllocator, BufferLedger> mapping for the allocator
Expand All @@ -142,20 +144,23 @@ void release(final BufferLedger ledger) {
"Expecting a mapping for allocator and reference manager");
final BufferLedger oldLedger = map.remove(allocator);

// needed for debug only: tell the allocator that AllocationManager is removing a
// reference manager associated with this particular allocator
((BaseAllocator) oldLedger.getAllocator()).dissociateLedger(oldLedger);
BufferAllocator oldAllocator = oldLedger.getAllocator();
if (oldAllocator instanceof BaseAllocator) {
// needed for debug only: tell the allocator that AllocationManager is removing a
// reference manager associated with this particular allocator
((BaseAllocator) oldAllocator).dissociateLedger(oldLedger);
}

if (oldLedger == owningLedger) {
// the release call was made by the owning reference manager
if (map.isEmpty()) {
// the only <allocator, reference manager> mapping was for the owner
// which now has been removed, it implies we can safely destroy the
// underlying memory chunk as it is no longer being referenced
((BaseAllocator) oldLedger.getAllocator()).releaseBytes(getSize());
oldAllocator.releaseBytes(getSize());
// free the memory chunk associated with the allocation manager
release0();
((BaseAllocator) oldLedger.getAllocator()).getListener().onRelease(getSize());
oldAllocator.getListener().onRelease(getSize());
amDestructionTime = System.nanoTime();
owningLedger = null;
} else {
Expand Down Expand Up @@ -209,7 +214,7 @@ public interface Factory {
* @param size Size (in bytes) of memory managed by the AllocationManager
* @return The created AllocationManager used by this allocator
*/
AllocationManager create(BaseAllocator accountingAllocator, long size);
AllocationManager create(BufferAllocator accountingAllocator, long size);

ArrowBuf empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ abstract class BaseAllocator extends Accountant implements BufferAllocator {
public static final Config DEFAULT_CONFIG = ImmutableConfig.builder().build();

// Package exposed for sharing between AllocatorManger and BaseAllocator objects
final String name;
final RootAllocator root;
private final String name;
private final RootAllocator root;
private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
final AllocationListener listener;
private final AllocationListener listener;
private final BaseAllocator parentAllocator;
private final Map<BaseAllocator, Object> childAllocators;
private final ArrowBuf empty;
Expand Down Expand Up @@ -124,7 +124,8 @@ protected BaseAllocator(
this.roundingPolicy = config.getRoundingPolicy();
}

AllocationListener getListener() {
@Override
public AllocationListener getListener() {
return listener;
}

Expand Down Expand Up @@ -314,6 +315,11 @@ private AllocationManager newAllocationManager(BaseAllocator accountingAllocator
return allocationManagerFactory.create(accountingAllocator, size);
}

@Override
public BufferAllocator getRoot() {
return root;
}

@Override
public BufferAllocator newChildAllocator(
final String name,
Expand Down Expand Up @@ -343,7 +349,7 @@ public BufferAllocator newChildAllocator(
synchronized (DEBUG_LOCK) {
childAllocators.put(childAllocator, childAllocator);
historicalLog.recordEvent("allocator[%s] created new child allocator[%s]", name,
childAllocator.name);
childAllocator.getName());
}
} else {
childAllocators.put(childAllocator, childAllocator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ public interface BufferAllocator extends AutoCloseable {
*/
ArrowBuf buffer(long size, BufferManager manager);

/**
* Get the root allocator of this allocator. If this allocator is already a root, return
* this directly.
*
* @return The root allocator
*/
BufferAllocator getRoot();

/**
* Create a new child allocator.
*
Expand Down Expand Up @@ -126,6 +134,30 @@ BufferAllocator newChildAllocator(
*/
long getHeadroom();

/**
* Forcibly allocate bytes. Returns whether the allocation fit within limits.
*
* @param size to increase
* @return Whether the allocation fit within limits.
*/
boolean forceAllocate(long size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see where this or releaseBytes are used in code. What is the purpose of releaseBytes and of makign forceAllocate public

Copy link
Member Author

@zhztheplayer zhztheplayer Jul 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @rymurr,

E.g. Please see the changes such as

https://github.com/apache/arrow/pull/7768/files#diff-68d17bac8aafb7004e2fb310f131df5fR159-R163

The uses of BufferAllocator requires for having the relevant methods extracted up to interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 sorry, missed that



/**
* Release bytes from this allocator.
*
* @param size to release
*/
void releaseBytes(long size);

/**
* Returns the allocation listener used by this allocator.
*
* @return the {@link AllocationListener} instance. Or {@link AllocationListener#NOOP} by default if no listener
* is configured when this allocator was created.
*/
AllocationListener getListener();

/**
* Returns the parent allocator.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* ArrowBufs managed by this reference manager share a common
* fate (same reference count).
*/
public class BufferLedger implements ValueWithKeyIncluded<BaseAllocator>, ReferenceManager {
public class BufferLedger implements ValueWithKeyIncluded<BufferAllocator>, ReferenceManager {
private final IdentityHashMap<ArrowBuf, Object> buffers =
BaseAllocator.DEBUG ? new IdentityHashMap<>() : null;
private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
Expand All @@ -41,14 +41,14 @@ public class BufferLedger implements ValueWithKeyIncluded<BaseAllocator>, Refere
// manage request for retain
// correctly
private final long lCreationTime = System.nanoTime();
private final BaseAllocator allocator;
private final BufferAllocator allocator;
private final AllocationManager allocationManager;
private final HistoricalLog historicalLog =
BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH,
"BufferLedger[%d]", 1) : null;
private volatile long lDestructionTime = 0;

BufferLedger(final BaseAllocator allocator, final AllocationManager allocationManager) {
BufferLedger(final BufferAllocator allocator, final AllocationManager allocationManager) {
this.allocator = allocator;
this.allocationManager = allocationManager;
}
Expand All @@ -57,7 +57,7 @@ boolean isOwningLedger() {
return this == allocationManager.getOwningLedger();
}

public BaseAllocator getKey() {
public BufferAllocator getKey() {
return allocator;
}

Expand Down Expand Up @@ -238,7 +238,7 @@ public ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long lengt
"ArrowBuf(BufferLedger, BufferAllocator[%s], " +
"UnsafeDirectLittleEndian[identityHashCode == " +
"%d](%s)) => ledger hc == %d",
allocator.name, System.identityHashCode(derivedBuf), derivedBuf.toString(),
allocator.getName(), System.identityHashCode(derivedBuf), derivedBuf.toString(),
System.identityHashCode(this));

synchronized (buffers) {
Expand Down Expand Up @@ -275,7 +275,7 @@ ArrowBuf newArrowBuf(final long length, final BufferManager manager) {
historicalLog.recordEvent(
"ArrowBuf(BufferLedger, BufferAllocator[%s], " +
"UnsafeDirectLittleEndian[identityHashCode == " + "%d](%s)) => ledger hc == %d",
allocator.name, System.identityHashCode(buf), buf.toString(),
allocator.getName(), System.identityHashCode(buf), buf.toString(),
System.identityHashCode(this));

synchronized (buffers) {
Expand Down Expand Up @@ -317,7 +317,7 @@ public ArrowBuf retain(final ArrowBuf srcBuffer, BufferAllocator target) {
// alternatively, if there was already a mapping for <buffer allocator, ref manager> in
// allocation manager, the ref count of the new buffer will be targetrefmanager.refcount() + 1
// and this will be true for all the existing buffers currently managed by targetrefmanager
final BufferLedger targetRefManager = allocationManager.associate((BaseAllocator) target);
final BufferLedger targetRefManager = allocationManager.associate(target);
// create a new ArrowBuf to associate with new allocator and target ref manager
final long targetBufLength = srcBuffer.capacity();
ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength);
Expand All @@ -336,8 +336,8 @@ public ArrowBuf retain(final ArrowBuf srcBuffer, BufferAllocator target) {
boolean transferBalance(final ReferenceManager targetReferenceManager) {
Preconditions.checkArgument(targetReferenceManager != null,
"Expecting valid target reference manager");
final BaseAllocator targetAllocator = (BaseAllocator) targetReferenceManager.getAllocator();
Preconditions.checkArgument(allocator.root == targetAllocator.root,
final BufferAllocator targetAllocator = targetReferenceManager.getAllocator();
Preconditions.checkArgument(allocator.getRoot() == targetAllocator.getRoot(),
"You can only transfer between two allocators that share the same root.");

allocator.assertOpen();
Expand Down Expand Up @@ -411,7 +411,7 @@ public TransferResult transferOwnership(final ArrowBuf srcBuffer, final BufferAl
// alternatively, if there was already a mapping for <buffer allocator, ref manager> in
// allocation manager, the ref count of the new buffer will be targetrefmanager.refcount() + 1
// and this will be true for all the existing buffers currently managed by targetrefmanager
final BufferLedger targetRefManager = allocationManager.associate((BaseAllocator) target);
final BufferLedger targetRefManager = allocationManager.associate(target);
// create a new ArrowBuf to associate with new allocator and target ref manager
final long targetBufLength = srcBuffer.capacity();
final ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength);
Expand Down Expand Up @@ -486,7 +486,7 @@ void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) {
.append("ledger[")
.append(ledgerId)
.append("] allocator: ")
.append(allocator.name)
.append(allocator.getName())
.append("), isOwning: ")
.append(", size: ")
.append(", references: ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class DefaultAllocationManagerFactory implements AllocationManager.Factor
MemoryUtil.UNSAFE.allocateMemory(0));

@Override
public AllocationManager create(BaseAllocator accountingAllocator, long size) {
public AllocationManager create(BufferAllocator accountingAllocator, long size) {
return new AllocationManager(accountingAllocator) {
private final long allocatedSize = size;
private final long address = MemoryUtil.UNSAFE.allocateMemory(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class DefaultAllocationManagerFactory implements AllocationManager.Factor
public static final AllocationManager.Factory FACTORY = NettyAllocationManager.FACTORY;

@Override
public AllocationManager create(BaseAllocator accountingAllocator, long size) {
public AllocationManager create(BufferAllocator accountingAllocator, long size) {
return FACTORY.create(accountingAllocator, size);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class NettyAllocationManager extends AllocationManager {
public static final AllocationManager.Factory FACTORY = new AllocationManager.Factory() {

@Override
public AllocationManager create(BaseAllocator accountingAllocator, long size) {
public AllocationManager create(BufferAllocator accountingAllocator, long size) {
return new NettyAllocationManager(accountingAllocator, size);
}

Expand Down Expand Up @@ -65,7 +65,7 @@ public ArrowBuf empty() {
*/
private final int allocationCutOffValue;

NettyAllocationManager(BaseAllocator accountingAllocator, long requestedSize, int allocationCutOffValue) {
NettyAllocationManager(BufferAllocator accountingAllocator, long requestedSize, int allocationCutOffValue) {
super(accountingAllocator);
this.allocationCutOffValue = allocationCutOffValue;

Expand All @@ -80,7 +80,7 @@ public ArrowBuf empty() {
}
}

NettyAllocationManager(BaseAllocator accountingAllocator, long requestedSize) {
NettyAllocationManager(BufferAllocator accountingAllocator, long requestedSize) {
this(accountingAllocator, requestedSize, DEFAULT_ALLOCATION_CUTOFF_VALUE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ private BaseAllocator createAllocatorWithCustomizedAllocationManager() {
.maxAllocation(MAX_ALLOCATION)
.allocationManagerFactory(new AllocationManager.Factory() {
@Override
public AllocationManager create(BaseAllocator accountingAllocator, long requestedSize) {
public AllocationManager create(BufferAllocator accountingAllocator, long requestedSize) {
return new AllocationManager(accountingAllocator) {
private final Unsafe unsafe = getUnsafe();
private final long address = unsafe.allocateMemory(requestedSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private BaseAllocator createCustomizedAllocator() {
return new RootAllocator(BaseAllocator.configBuilder()
.allocationManagerFactory(new AllocationManager.Factory() {
@Override
public AllocationManager create(BaseAllocator accountingAllocator, long size) {
public AllocationManager create(BufferAllocator accountingAllocator, long size) {
return new NettyAllocationManager(accountingAllocator, size, CUSTOMIZED_ALLOCATION_CUTOFF_VALUE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class DefaultAllocationManagerFactory implements AllocationManager.Factor
public static final AllocationManager.Factory FACTORY = UnsafeAllocationManager.FACTORY;

@Override
public AllocationManager create(BaseAllocator accountingAllocator, long size) {
public AllocationManager create(BufferAllocator accountingAllocator, long size) {
return FACTORY.create(accountingAllocator, size);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public final class UnsafeAllocationManager extends AllocationManager {

public static final AllocationManager.Factory FACTORY = new Factory() {
@Override
public AllocationManager create(BaseAllocator accountingAllocator, long size) {
public AllocationManager create(BufferAllocator accountingAllocator, long size) {
return new UnsafeAllocationManager(accountingAllocator, size);
}

Expand All @@ -46,7 +46,7 @@ public ArrowBuf empty() {

private final long allocatedAddress;

UnsafeAllocationManager(BaseAllocator accountingAllocator, long requestedSize) {
UnsafeAllocationManager(BufferAllocator accountingAllocator, long requestedSize) {
super(accountingAllocator);
allocatedAddress = MemoryUtil.UNSAFE.allocateMemory(requestedSize);
allocatedSize = requestedSize;
Expand Down