Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
return existingLedger;
}

final BufferLedger ledger = new BufferLedger(allocator, new ReleaseListener(allocator));
final BufferLedger ledger = new BufferLedger(allocator);
if (retain) {
ledger.inc();
}
Expand All @@ -151,54 +151,41 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
}
}


/**
* The way that a particular BufferLedger communicates back to the AllocationManager that it
* now longer needs to hold
* a reference to particular piece of memory.
* Can only be called when you already hold the writeLock.
*/
private class ReleaseListener {

private final BufferAllocator allocator;

public ReleaseListener(BufferAllocator allocator) {
this.allocator = allocator;
}

/**
* Can only be called when you already hold the writeLock.
*/
public void release() {
allocator.assertOpen();
private void release(final BufferLedger ledger) {
final BaseAllocator allocator = ledger.getAllocator();
allocator.assertOpen();

final BufferLedger oldLedger = map.remove(allocator);
oldLedger.allocator.dissociateLedger(oldLedger);
final BufferLedger oldLedger = map.remove(allocator);
oldLedger.allocator.dissociateLedger(oldLedger);

if (oldLedger == owningLedger) {
if (map.isEmpty()) {
// no one else owns, lets release.
oldLedger.allocator.releaseBytes(size);
underlying.release();
amDestructionTime = System.nanoTime();
owningLedger = null;
} else {
// we need to change the owning allocator. we've been removed so we'll get whatever is
// top of list
BufferLedger newLedger = map.values().iterator().next();

// we'll forcefully transfer the ownership and not worry about whether we exceeded the
// limit
// since this consumer can't do anything with this.
oldLedger.transferBalance(newLedger);
}
if (oldLedger == owningLedger) {
if (map.isEmpty()) {
// no one else owns, lets release.
oldLedger.allocator.releaseBytes(size);
underlying.release();
amDestructionTime = System.nanoTime();
owningLedger = null;
} else {
if (map.isEmpty()) {
throw new IllegalStateException("The final removal of a ledger should be connected to " +
"the owning ledger.");
}
// we need to change the owning allocator. we've been removed so we'll get whatever is
// top of list
BufferLedger newLedger = map.values().iterator().next();

// we'll forcefully transfer the ownership and not worry about whether we exceeded the
// limit
// since this consumer can't do anything with this.
oldLedger.transferBalance(newLedger);
}
} else {
if (map.isEmpty()) {
throw new IllegalStateException("The final removal of a ledger should be connected to " +
"the owning ledger.");
}


}
}

Expand All @@ -221,16 +208,22 @@ public class BufferLedger {
// correctly
private final long lCreationTime = System.nanoTime();
private final BaseAllocator allocator;
private final ReleaseListener listener;
private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog
(BaseAllocator.DEBUG_LOG_LENGTH,
"BufferLedger[%d]", 1)
: null;
private volatile long lDestructionTime = 0;

private BufferLedger(BaseAllocator allocator, ReleaseListener listener) {
private BufferLedger(BaseAllocator allocator) {
this.allocator = allocator;
this.listener = listener;
}

/**
* Get the allocator for this ledger
* @return allocator
*/
private BaseAllocator getAllocator() {
return allocator;
}

/**
Expand Down Expand Up @@ -340,7 +333,7 @@ public int decrement(int decrement) {
outcome = bufRefCnt.addAndGet(-decrement);
if (outcome == 0) {
lDestructionTime = System.nanoTime();
listener.release();
release(this);
}
}

Expand Down