diff --git a/src/main/java/com/basho/riak/client/api/RiakClient.java b/src/main/java/com/basho/riak/client/api/RiakClient.java index 6ce3d0677..114c31b38 100644 --- a/src/main/java/com/basho/riak/client/api/RiakClient.java +++ b/src/main/java/com/basho/riak/client/api/RiakClient.java @@ -427,4 +427,17 @@ public RiakCluster getRiakCluster() { return cluster; } + + /** + * Cleans up any Thread-Local variables after shutdown. + * This operation is useful when you are in a container environment, and you + * do not want to leave the thread local variables in the threads you do not manage. + * Call this method when your application is being unloaded from the container, after + * all {@link RiakNode}, {@link RiakCluster}, and {@link com.basho.riak.client.api.RiakClient} + * objects are in the shutdown state. + */ + public void cleanup() + { + cluster.cleanup(); + } } diff --git a/src/main/java/com/basho/riak/client/core/RiakCluster.java b/src/main/java/com/basho/riak/client/core/RiakCluster.java index 91f5f0a32..e49a84e83 100644 --- a/src/main/java/com/basho/riak/client/core/RiakCluster.java +++ b/src/main/java/com/basho/riak/client/core/RiakCluster.java @@ -635,6 +635,21 @@ public static Builder builder(RiakNode node) return new Builder(node); } + /** + * Cleans up any Thread-Local variables after shutdown. + * This operation is useful when you are in a container environment, and you + * do not want to leave the thread local variables in the threads you do not manage. + * Call this method when your application is being unloaded from the container, after + * all {@link RiakNode}, {@link RiakCluster}, and {@link com.basho.riak.client.api.RiakClient} + * objects are in the shutdown state. + */ + public synchronized void cleanup() + { + stateCheck(State.SHUTDOWN); + io.netty.util.concurrent.FastThreadLocal.removeAll(); + io.netty.util.concurrent.FastThreadLocal.destroy(); + } + /** * Builder used to create {@link RiakCluster} instances. */ diff --git a/src/test/java/com/basho/riak/client/core/RiakClusterTest.java b/src/test/java/com/basho/riak/client/core/RiakClusterTest.java index a19ed3b72..9eb48c080 100644 --- a/src/test/java/com/basho/riak/client/core/RiakClusterTest.java +++ b/src/test/java/com/basho/riak/client/core/RiakClusterTest.java @@ -16,6 +16,7 @@ package com.basho.riak.client.core; import com.google.protobuf.Message; +import io.netty.util.concurrent.FastThreadLocal; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.mockito.PowerMockito; @@ -30,14 +31,14 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; - +import static org.powermock.api.mockito.PowerMockito.verifyStatic; /** * * @author Brian Roach * @author Sergey Galkin */ @RunWith(PowerMockRunner.class) -@PrepareForTest(FutureOperation.class) +@PrepareForTest({FutureOperation.class, FastThreadLocal.class}) public class RiakClusterTest { @Test @@ -222,6 +223,37 @@ public void nodeOperationQueue() throws Exception verify(nodeManager, times(1)).executeOnNode(operation4, null); } + @Test + public void testCleanup() throws Exception + { + RiakNode node = mock(RiakNode.class); + RiakNode.Builder nodeBuilder = spy(new RiakNode.Builder()); + doReturn(node).when(nodeBuilder).build(); + PowerMockito.mockStatic(FastThreadLocal.class); + PowerMockito.doNothing().when(FastThreadLocal.class, "destroy"); + PowerMockito.doNothing().when(FastThreadLocal.class, "removeAll"); + + RiakCluster cluster = new RiakCluster.Builder(nodeBuilder.build()).build(); + Whitebox.setInternalState(cluster, "state", RiakCluster.State.SHUTDOWN); + + cluster.cleanup(); + + verifyStatic(times(2)); + } + + @Test(expected = IllegalStateException.class) + public void testCleanupNotShutdown() + { + RiakNode node = mock(RiakNode.class); + RiakNode.Builder nodeBuilder = spy(new RiakNode.Builder()); + doReturn(node).when(nodeBuilder).build(); + + RiakCluster cluster = new RiakCluster.Builder(nodeBuilder.build()).build(); + Whitebox.setInternalState(cluster, "state", RiakCluster.State.RUNNING); + + cluster.cleanup(); + } + private void assertQueueStatus(RiakCluster cluster, Integer expectedQueueSize, RiakCluster.State expectedClusterState, FutureOperation expectedQueueHead)