From dc85a4f611719920e0e3ac5277219d00a0a7562c Mon Sep 17 00:00:00 2001 From: leventov Date: Sat, 21 Jan 2017 00:08:29 -0600 Subject: [PATCH 1/4] Specialize LoadBalancingPool as MemcacheClientPool, reduce locking and don't override Object.finalize() --- .../druid/collections/LoadBalancingPool.java | 144 ------------------ .../client/cache/MemcacheClientPool.java | 141 +++++++++++++++++ .../io/druid/client/cache/MemcachedCache.java | 3 +- 3 files changed, 142 insertions(+), 146 deletions(-) delete mode 100644 common/src/main/java/io/druid/collections/LoadBalancingPool.java create mode 100644 server/src/main/java/io/druid/client/cache/MemcacheClientPool.java diff --git a/common/src/main/java/io/druid/collections/LoadBalancingPool.java b/common/src/main/java/io/druid/collections/LoadBalancingPool.java deleted file mode 100644 index 347558f1cc82..000000000000 --- a/common/src/main/java/io/druid/collections/LoadBalancingPool.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.collections; - -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.base.Throwables; - -import io.druid.java.util.common.logger.Logger; - -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Simple load balancing pool that always returns the least used item. - * - * An item's usage is incremented every time one gets requested from the pool - * and is decremented every time close is called on the holder. - * - * The pool eagerly instantiates all the items in the pool when created, - * using the given supplier. - * - * @param type of items to pool - */ -public class LoadBalancingPool implements Supplier> -{ - private static final Logger log = new Logger(LoadBalancingPool.class); - - private final Supplier generator; - private final int capacity; - private final PriorityBlockingQueue queue; - - public LoadBalancingPool(int capacity, Supplier generator) - { - Preconditions.checkArgument(capacity > 0, "capacity must be greater than 0"); - Preconditions.checkNotNull(generator); - - this.generator = generator; - this.capacity = capacity; - this.queue = new PriorityBlockingQueue<>(capacity); - - // eagerly intantiate all items in the pool - init(); - } - - private void init() { - for(int i = 0; i < capacity; ++i) { - queue.offer(new CountingHolder(generator.get())); - } - } - - public ResourceHolder get() - { - final CountingHolder holder; - // items never stay out of the queue for long, so we'll get one eventually - try { - holder = queue.take(); - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); - throw Throwables.propagate(e); - } - - // synchronize on item to ensure count cannot get changed by - // CountingHolder.close right after we put the item back in the queue - synchronized (holder) { - holder.count.incrementAndGet(); - queue.offer(holder); - } - return holder; - } - - private class CountingHolder implements ResourceHolder, Comparable - { - private AtomicInteger count = new AtomicInteger(0); - private final T object; - - public CountingHolder(final T object) - { - this.object = object; - } - - @Override - public T get() - { - return object; - } - - /** - * Not idempotent, should only be called once when done using the resource - */ - @Override - public void close() - { - // ensures count always gets adjusted while item is removed from the queue - synchronized (this) { - // item may not be in queue if another thread is calling LoadBalancingPool.get() - // at the same time; in that case let the other thread put it back. - boolean removed = queue.remove(this); - count.decrementAndGet(); - if (removed) { - queue.offer(this); - } - } - } - - @Override - public int compareTo(CountingHolder o) - { - return Integer.compare(count.get(), o.count.get()); - } - - - @Override - protected void finalize() throws Throwable - { - try { - final int shouldBeZero = count.get(); - if (shouldBeZero != 0) { - log.warn("Expected 0 resource count, got [%d]! Object was[%s].", shouldBeZero, object); - } - } - finally { - super.finalize(); - } - } - } -} diff --git a/server/src/main/java/io/druid/client/cache/MemcacheClientPool.java b/server/src/main/java/io/druid/client/cache/MemcacheClientPool.java new file mode 100644 index 000000000000..e841db933e53 --- /dev/null +++ b/server/src/main/java/io/druid/client/cache/MemcacheClientPool.java @@ -0,0 +1,141 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import io.druid.collections.ResourceHolder; +import io.druid.java.util.common.logger.Logger; +import net.spy.memcached.MemcachedClientIF; +import sun.misc.Cleaner; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Simple load balancing pool that always returns the least used {@link MemcachedClientIF}. + * + * A client's usage is incremented every time one gets requested from the pool + * and is decremented every time close is called on the holder. + * + * The pool eagerly instantiates all the clients in the pool when created, + * using the given supplier. + */ +final class MemcacheClientPool implements Supplier> +{ + private static final Logger log = new Logger(MemcacheClientPool.class); + + /** + * The number of memcached connections is not expected to be small (<= 8), so it's easier to find the least used + * connection using a linear search over a simple array, than fiddling with PriorityBlockingQueue. This also allows + * to reduce locking. + */ + private final CountingHolder[] connections; + + MemcacheClientPool(int capacity, Supplier generator) + { + Preconditions.checkArgument(capacity > 0, "capacity must be greater than 0"); + Preconditions.checkNotNull(generator); + + CountingHolder[] connections = new CountingHolder[capacity]; + // eagerly instantiate all items in the pool + for(int i = 0; i < capacity; ++i) { + connections[i] = new CountingHolder(generator.get()); + } + // Assign the final field after filling the array to ensure visibility of elements + this.connections = connections; + } + + @Override + public synchronized ResourceHolder get() + { + CountingHolder leastUsedClientHolder = connections[0]; + int minCount = leastUsedClientHolder.count.get(); + for (int i = 1; i < connections.length; i++) { + CountingHolder clientHolder = connections[i]; + int count = clientHolder.count.get(); + if (count < minCount) { + leastUsedClientHolder = clientHolder; + minCount = count; + } + } + leastUsedClientHolder.count.incrementAndGet(); + return new IdempotentCloseableHolder(leastUsedClientHolder); + } + + private static class CountingHolder + { + private final AtomicInteger count = new AtomicInteger(0); + private final MemcachedClientIF clientIF; + @SuppressWarnings("unused") + private final Cleaner cleaner; + + private CountingHolder(final MemcachedClientIF clientIF) + { + this.clientIF = clientIF; + cleaner = Cleaner.create(this, new ClientLeakNotifier(count, clientIF)); + } + } + + private static class IdempotentCloseableHolder implements ResourceHolder + { + private CountingHolder countingHolder; + + private IdempotentCloseableHolder(CountingHolder countingHolder) + { + this.countingHolder = countingHolder; + } + + @Override + public MemcachedClientIF get() + { + return countingHolder.clientIF; + } + + @Override + public void close() + { + if (countingHolder != null) { + countingHolder.count.decrementAndGet(); + countingHolder = null; + } + } + } + + private static class ClientLeakNotifier implements Runnable + { + private final AtomicInteger count; + private final MemcachedClientIF clientIF; + + private ClientLeakNotifier(AtomicInteger count, MemcachedClientIF clientIF) + { + this.count = count; + this.clientIF = clientIF; + } + + @Override + public void run() + { + final int shouldBeZero = count.get(); + if (shouldBeZero != 0) { + log.warn("Expected 0 resource count, got [%d]! Object was[%s].", shouldBeZero, clientIF); + } + } + } +} diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCache.java b/server/src/main/java/io/druid/client/cache/MemcachedCache.java index a5959a299cdc..ab63dce819db 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCache.java @@ -35,7 +35,6 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.metrics.AbstractMonitor; -import io.druid.collections.LoadBalancingPool; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidResourceHolder; import io.druid.java.util.common.logger.Logger; @@ -359,7 +358,7 @@ public void updateHistogram(String name, int amount) final Supplier> clientSupplier; if (config.getNumConnections() > 1) { - clientSupplier = new LoadBalancingPool( + clientSupplier = new MemcacheClientPool( config.getNumConnections(), new Supplier() { From 7655209fa7cade16ed5b01dff75d9ea8279c2c4e Mon Sep 17 00:00:00 2001 From: leventov Date: Sat, 21 Jan 2017 20:46:35 -0600 Subject: [PATCH 2/4] Remove locking and don't override Object.finalize() in ReferenceCountingResourceHolder --- .../ReferenceCountingResourceHolder.java | 157 ++++++++++-------- 1 file changed, 87 insertions(+), 70 deletions(-) diff --git a/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java b/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java index bb8cf158d797..9ebf01453ce9 100644 --- a/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java +++ b/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java @@ -19,27 +19,32 @@ package io.druid.collections; -import java.io.Closeable; -import java.util.concurrent.atomic.AtomicBoolean; - +import com.google.common.base.Throwables; import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; +import sun.misc.Cleaner; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; public class ReferenceCountingResourceHolder implements ResourceHolder { private static final Logger log = new Logger(ReferenceCountingResourceHolder.class); - private final Object lock = new Object(); private final T object; private final Closeable closer; + private final AtomicInteger refCount = new AtomicInteger(1); + private final AtomicBoolean closed = new AtomicBoolean(false); + @SuppressWarnings("unused") + private final Cleaner cleaner; - private int refcount = 1; - private boolean didClose = false; - - public ReferenceCountingResourceHolder(final T object, final Closeable closer) + ReferenceCountingResourceHolder(final T object, final Closeable closer) { this.object = object; this.closer = closer; + this.cleaner = Cleaner.create(this, new CloserRunnable(object, closer, refCount)); } public static ReferenceCountingResourceHolder fromCloseable(final T object) @@ -50,92 +55,104 @@ public static ReferenceCountingResourceHolder fromClose @Override public T get() { - synchronized (lock) { - if (refcount <= 0) { - throw new ISE("Already closed!"); - } - - return object; + if (refCount.get() <= 0) { + throw new ISE("Already closed!"); } + return object; } public Releaser increment() { - synchronized (lock) { - if (refcount <= 0) { - throw new ISE("Already closed!"); - } + if (refCount.getAndIncrement() <= 0) { + refCount.decrementAndGet(); + throw new ISE("Already closed!"); + } - refcount++; + // This Releaser is supposed to be used from a single thread, so no synchronization/atomicity + return new Releaser() + { + boolean released = false; - return new Releaser() + @Override + public void close() { - final AtomicBoolean didRelease = new AtomicBoolean(); - - @Override - public void close() - { - if (didRelease.compareAndSet(false, true)) { - decrement(); - } else { - log.warn("WTF?! release called but we are already released!"); - } + if (!released) { + decrement(); + released = true; + } else { + log.warn(new ISE("Already closed"), "Already closed"); } - - @Override - protected void finalize() throws Throwable - { - if (didRelease.compareAndSet(false, true)) { - log.warn("Not released! Object was[%s], releasing on finalize of releaser.", object); - decrement(); - } - } - }; - } - } - - public int getReferenceCount() - { - synchronized (lock) { - return refcount; - } + } + }; } @Override public void close() { - synchronized (lock) { - if (!didClose) { - didClose = true; - decrement(); - } else { - log.warn(new ISE("Already closed!"), "Already closed"); - } + if (closed.compareAndSet(false, true)) { + decrement(); + } else { + log.warn(new ISE("Already closed"), "Already closed"); } } - @Override - protected void finalize() throws Throwable + private void decrement() { - synchronized (lock) { - if (!didClose) { - log.warn("Not closed! Object was[%s], closing on finalize of holder.", object); - didClose = true; - decrement(); + // Checking that the count is exactly equal to 0, rather than less or equal, helps to avoid calling closer.close() + // twice if there is a race with CloserRunnable. Such a race is possible and could be avoided only with + // reachabilityFence() (Java 9+) in ReferenceCountingResourceHolder's and Releaser's close() methods. + if (refCount.decrementAndGet() == 0) { + try { + closer.close(); + } + catch (IOException e) { + throw Throwables.propagate(e); } } } - private void decrement() + private static class CloserRunnable implements Runnable { - synchronized (lock) { - refcount--; - if (refcount <= 0) { - try { - closer.close(); + private final Object object; + private final Closeable closer; + private final AtomicInteger refCount; + + private CloserRunnable(Object object, Closeable closer, AtomicInteger refCount) + { + this.object = object; + this.closer = closer; + this.refCount = refCount; + } + + @Override + public void run() + { + while (true) { + int count = refCount.get(); + if (count <= 0) { + return; } - catch (Exception e) { - log.error(e, "WTF?! Close failed, uh oh..."); + if (refCount.compareAndSet(count, 0)) { + try { + closer.close(); + return; + } + catch (Exception e) { + try { + log.error(e, "Exception in closer"); + } + catch (Exception ignore) { + // ignore + } + } + finally { + try { + log.warn("Not closed! Object was[%s]", object); + } + catch (Exception ignore) { + // ignore + } + } } } } From 3e55884108363632871dd5142bc08862916dc498 Mon Sep 17 00:00:00 2001 From: leventov Date: Mon, 23 Jan 2017 20:03:38 -0600 Subject: [PATCH 3/4] Add leak counts in ReferenceCountingResourceHolder and MemcacheClientPool. Add tests for ReferenceCountingResourceHolder and MemcacheClientPool --- .../ReferenceCountingResourceHolder.java | 9 ++ .../ReferenceCountingResourceHolderTest.java | 142 ++++++++++++++++++ .../client/cache/MemcacheClientPool.java | 20 ++- .../client/cache/MemcacheClientPoolTest.java | 82 ++++++++++ 4 files changed, 251 insertions(+), 2 deletions(-) create mode 100644 common/src/test/java/io/druid/collections/ReferenceCountingResourceHolderTest.java create mode 100644 server/src/test/java/io/druid/client/cache/MemcacheClientPoolTest.java diff --git a/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java b/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java index 9ebf01453ce9..656b81057e15 100644 --- a/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java +++ b/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java @@ -28,11 +28,19 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public class ReferenceCountingResourceHolder implements ResourceHolder { private static final Logger log = new Logger(ReferenceCountingResourceHolder.class); + private static final AtomicLong leakedResources = new AtomicLong(); + + public static long leakedResources() + { + return leakedResources.get(); + } + private final T object; private final Closeable closer; private final AtomicInteger refCount = new AtomicInteger(1); @@ -134,6 +142,7 @@ public void run() } if (refCount.compareAndSet(count, 0)) { try { + leakedResources.incrementAndGet(); closer.close(); return; } diff --git a/common/src/test/java/io/druid/collections/ReferenceCountingResourceHolderTest.java b/common/src/test/java/io/druid/collections/ReferenceCountingResourceHolderTest.java new file mode 100644 index 000000000000..ba73fe6cd163 --- /dev/null +++ b/common/src/test/java/io/druid/collections/ReferenceCountingResourceHolderTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.collections; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ReferenceCountingResourceHolderTest +{ + @Test + public void testIdiomaticUsage() + { + // Smoke testing + for (int i = 0; i < 100; i++) { + runIdiomaticUsage(); + } + } + + private void runIdiomaticUsage() + { + final AtomicBoolean released = new AtomicBoolean(false); + final ReferenceCountingResourceHolder resourceHolder = makeReleasingHandler(released); + List threads = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + Thread thread = new Thread() { + @Override + public void run() + { + try (Releaser r = resourceHolder.increment()) { + try { + Thread.sleep(1); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + }; + thread.start(); + threads.add(thread); + } + for (Thread thread : threads) { + try { + thread.join(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + Assert.assertFalse(released.get()); + resourceHolder.close(); + Assert.assertTrue(released.get()); + } + + private ReferenceCountingResourceHolder makeReleasingHandler(final AtomicBoolean released) + { + return ReferenceCountingResourceHolder + .fromCloseable((Closeable) new Closeable() + { + @Override + public void close() throws IOException + { + released.set(true); + } + }); + } + + @Test(timeout = 60_000) + public void testResourceHandlerClearedByJVM() throws InterruptedException + { + if (System.getProperty("java.version").startsWith("1.7")) { + // This test is unreliable on Java 7, probably GC is not triggered by System.gc(). It is not a problem because + // this test should ever pass on any version of Java to prove that ReferenceCountingResourceHolder doesn't + // introduce leaks itself and actually cleans the leaked resources. + return; + } + long initialLeakedResources = ReferenceCountingResourceHolder.leakedResources(); + final AtomicBoolean released = new AtomicBoolean(false); + makeReleasingHandler(released); // Don't store the handler in a variable and don't close it, the object leaked + verifyCleanerRun(released, initialLeakedResources); + } + + @Test(timeout = 60_000) + public void testResourceHandlerWithReleaserClearedByJVM() throws InterruptedException + { + if (System.getProperty("java.version").startsWith("1.7")) { + // This test is unreliable on Java 7, probably GC is not triggered by System.gc(). It is not a problem because + // this test should ever pass on any version of Java to prove that ReferenceCountingResourceHolder doesn't + // introduce leaks itself and actually cleans the leaked resources. + return; + } + long initialLeakedResources = ReferenceCountingResourceHolder.leakedResources(); + final AtomicBoolean released = new AtomicBoolean(false); + // createDanglingReleaser() need to be a separate method because otherwise JVM preserves a ref to Holder on stack + // and Cleaner is not called + createDanglingReleaser(released); + verifyCleanerRun(released, initialLeakedResources); + } + + private void createDanglingReleaser(AtomicBoolean released) + { + try (ReferenceCountingResourceHolder handler = makeReleasingHandler(released)) { + handler.increment(); // Releaser not close, the object leaked + } + } + + private void verifyCleanerRun(AtomicBoolean released, long initialLeakedResources) throws InterruptedException + { + // Wait until Closer runs + for (int i = 0; i < 6000 && ReferenceCountingResourceHolder.leakedResources() == initialLeakedResources; i++) { + System.gc(); + byte[] garbage = new byte[10_000_000]; + Thread.sleep(10); + } + Assert.assertEquals(initialLeakedResources + 1, ReferenceCountingResourceHolder.leakedResources()); + // Cleaner also runs the closer + Assert.assertTrue(released.get()); + } +} diff --git a/server/src/main/java/io/druid/client/cache/MemcacheClientPool.java b/server/src/main/java/io/druid/client/cache/MemcacheClientPool.java index e841db933e53..440d9557c106 100644 --- a/server/src/main/java/io/druid/client/cache/MemcacheClientPool.java +++ b/server/src/main/java/io/druid/client/cache/MemcacheClientPool.java @@ -19,6 +19,7 @@ package io.druid.client.cache; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import io.druid.collections.ResourceHolder; @@ -27,6 +28,7 @@ import sun.misc.Cleaner; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * Simple load balancing pool that always returns the least used {@link MemcachedClientIF}. @@ -41,6 +43,13 @@ final class MemcacheClientPool implements Supplier get() + public synchronized IdempotentCloseableHolder get() { CountingHolder leastUsedClientHolder = connections[0]; int minCount = leastUsedClientHolder.count.get(); @@ -93,7 +102,8 @@ private CountingHolder(final MemcachedClientIF clientIF) } } - private static class IdempotentCloseableHolder implements ResourceHolder + @VisibleForTesting + static class IdempotentCloseableHolder implements ResourceHolder { private CountingHolder countingHolder; @@ -108,6 +118,11 @@ public MemcachedClientIF get() return countingHolder.clientIF; } + int count() + { + return countingHolder.count.get(); + } + @Override public void close() { @@ -134,6 +149,7 @@ public void run() { final int shouldBeZero = count.get(); if (shouldBeZero != 0) { + leakedClients.incrementAndGet(); log.warn("Expected 0 resource count, got [%d]! Object was[%s].", shouldBeZero, clientIF); } } diff --git a/server/src/test/java/io/druid/client/cache/MemcacheClientPoolTest.java b/server/src/test/java/io/druid/client/cache/MemcacheClientPoolTest.java new file mode 100644 index 000000000000..0ce83617a333 --- /dev/null +++ b/server/src/test/java/io/druid/client/cache/MemcacheClientPoolTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.google.common.base.Suppliers; +import net.spy.memcached.MemcachedClientIF; +import org.junit.Assert; +import org.junit.Test; + +public class MemcacheClientPoolTest +{ + + @Test + public void testSimpleUsage() + { + MemcacheClientPool pool = new MemcacheClientPool(3, Suppliers.ofInstance((MemcachedClientIF) null)); + // First round + MemcacheClientPool.IdempotentCloseableHolder first = pool.get(); + Assert.assertEquals(1, first.count()); + MemcacheClientPool.IdempotentCloseableHolder second = pool.get(); + Assert.assertEquals(1, second.count()); + MemcacheClientPool.IdempotentCloseableHolder third = pool.get(); + Assert.assertEquals(1, third.count()); + // Second round + MemcacheClientPool.IdempotentCloseableHolder firstClientSecondRound = pool.get(); + Assert.assertEquals(2, firstClientSecondRound.count()); + MemcacheClientPool.IdempotentCloseableHolder secondClientSecondRound = pool.get(); + Assert.assertEquals(2, secondClientSecondRound.count()); + first.close(); + firstClientSecondRound.close(); + MemcacheClientPool.IdempotentCloseableHolder firstAgain = pool.get(); + Assert.assertEquals(1, firstAgain.count()); + + firstAgain.close(); + second.close(); + third.close(); + secondClientSecondRound.close(); + } + + @Test + public void testClientLeakDetected() throws InterruptedException + { + if (System.getProperty("java.version").startsWith("1.7")) { + // This test is unreliable on Java 7, probably GC is not triggered by System.gc(). It is not a problem because + // this test should ever pass on any version of Java to prove that MemcacheClientPool doesn't introduce leaks + // itself. + return; + } + long initialLeakedClients = MemcacheClientPool.leakedClients(); + createDanglingClient(); + // Wait until Closer runs + for (int i = 0; i < 6000 && MemcacheClientPool.leakedClients() == initialLeakedClients; i++) { + System.gc(); + byte[] garbage = new byte[10_000_000]; + Thread.sleep(10); + } + Assert.assertEquals(initialLeakedClients + 1, MemcacheClientPool.leakedClients()); + } + + private void createDanglingClient() + { + MemcacheClientPool pool = new MemcacheClientPool(1, Suppliers.ofInstance((MemcachedClientIF) null)); + pool.get(); + } +} From b1c20eda3c591f54faba421643b99905de18a29f Mon Sep 17 00:00:00 2001 From: leventov Date: Tue, 24 Jan 2017 20:03:06 -0600 Subject: [PATCH 4/4] Fix a race condition in ReferenceCountingResourceHolder.increment() --- .../collections/ReferenceCountingResourceHolder.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java b/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java index 656b81057e15..48a0cdd1640c 100644 --- a/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java +++ b/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java @@ -71,9 +71,14 @@ public T get() public Releaser increment() { - if (refCount.getAndIncrement() <= 0) { - refCount.decrementAndGet(); - throw new ISE("Already closed!"); + while (true) { + int count = this.refCount.get(); + if (count <= 0) { + throw new ISE("Already closed!"); + } + if (refCount.compareAndSet(count, count + 1)) { + break; + } } // This Releaser is supposed to be used from a single thread, so no synchronization/atomicity