diff --git a/src/com/amazon/ion/impl/bin/utf8/Pool.java b/src/com/amazon/ion/impl/bin/utf8/Pool.java index 647055bcc4..3647541872 100644 --- a/src/com/amazon/ion/impl/bin/utf8/Pool.java +++ b/src/com/amazon/ion/impl/bin/utf8/Pool.java @@ -1,6 +1,8 @@ package com.amazon.ion.impl.bin.utf8; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; abstract class Pool> { @@ -22,14 +24,19 @@ interface Allocator> { private static final int MAX_QUEUE_SIZE = 128; // A queue of previously initialized objects that can be loaned out. - private final ArrayBlockingQueue bufferQueue; + private final Queue objectQueue; + + // The current size of the queue. Note: some implementations of Queue.size() (including ConcurrentLinkedQueue's) + // are not constant-time operations. Tracking the size externally is a performance optimization. + private final AtomicInteger size; // Allocator of objects to be pooled. private final Allocator allocator; Pool(Allocator allocator) { this.allocator = allocator; - bufferQueue = new ArrayBlockingQueue(MAX_QUEUE_SIZE); + objectQueue = new ConcurrentLinkedQueue(); + size = new AtomicInteger(0); } /** @@ -40,10 +47,13 @@ interface Allocator> { */ public T getOrCreate() { // The `poll` method does not block. If the queue is empty it returns `null` immediately. - T object = bufferQueue.poll(); + T object = objectQueue.poll(); if (object == null) { - // No buffers were available in the pool. Create a new one. + // No objects were available in the pool. Create a new one. object = allocator.newInstance(this); + } else { + // An object was retrieved from the pool; decrement the pool size. + size.decrementAndGet(); } return object; } @@ -59,6 +69,17 @@ public T getOrCreate() { public void returnToPool(T object) { // The `offer` method does not block. If the queue is full, it returns `false` immediately. // If the provided instance cannot be added to the pool, we discard it silently. - bufferQueue.offer(object); + if (size.getAndIncrement() < MAX_QUEUE_SIZE) { + objectQueue.offer(object); + } else { + // The pool was full. Since the size was optimistically incremented, decrement it now. + // Note: there is a race condition here that is deliberately allowed as an optimization. + // Under high contention, multiple threads could end up here before the first one + // decrements the size, causing objects to be dropped wastefully. This is not harmful + // because objects will be re-allocated when necessary; the pool is kept as close as + // possible to capacity on a best-effort basis. This race condition should not be "fixed" + // without a thorough study of the performance implications. + size.decrementAndGet(); + } } }