From ec6fded0de82232622e97ccfa9d6f97062d2637d Mon Sep 17 00:00:00 2001 From: stulscott Date: Thu, 6 Oct 2022 13:00:32 +1000 Subject: [PATCH] CURATOR-654: Remove watcher after waiting on barrier. --- .../recipes/barriers/DistributedBarrier.java | 54 +++++++++----- .../barriers/TestDistributedBarrier.java | 72 ++++++++++++++++--- 2 files changed, 100 insertions(+), 26 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java index fb00ed9ae1..75159dae22 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java @@ -19,6 +19,7 @@ package org.apache.curator.framework.recipes.barriers; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.WatcherRemoveCuratorFramework; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -37,7 +38,7 @@ */ public class DistributedBarrier { - private final CuratorFramework client; + private final WatcherRemoveCuratorFramework client; private final String barrierPath; private final Watcher watcher = new Watcher() { @@ -54,7 +55,7 @@ public void process(WatchedEvent event) */ public DistributedBarrier(CuratorFramework client, String barrierPath) { - this.client = client; + this.client = client.newWatcherRemoveCuratorFramework(); this.barrierPath = PathUtils.validatePath(barrierPath); } @@ -92,6 +93,16 @@ public synchronized void removeBarrier() throws Exception } } + /** + * Indicates if the barrier is set or not. + * + * @return true if the barrier is set. + */ + public synchronized boolean isSet() throws Exception + { + return client.checkExists().forPath(barrierPath) != null; + } + /** * Blocks until the barrier node comes into existence * @@ -117,29 +128,36 @@ public synchronized boolean waitOnBarrier(long maxWait, TimeUnit unit) thro long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE; boolean result; - for(;;) + try { - result = (client.checkExists().usingWatcher(watcher).forPath(barrierPath) == null); - if ( result ) - { - break; - } - - if ( hasMaxWait ) + for(;;) { - long elapsed = System.currentTimeMillis() - startMs; - long thisWaitMs = maxWaitMs - elapsed; - if ( thisWaitMs <= 0 ) + result = (client.checkExists().usingWatcher(watcher).forPath(barrierPath) == null); + if ( result ) { break; } - wait(thisWaitMs); - } - else - { - wait(); + + if ( hasMaxWait ) + { + long elapsed = System.currentTimeMillis() - startMs; + long thisWaitMs = maxWaitMs - elapsed; + if ( thisWaitMs <= 0 ) + { + break; + } + wait(thisWaitMs); + } + else + { + wait(); + } } } + finally + { + client.removeWatchers(); + } return result; } } diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/barriers/TestDistributedBarrier.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/barriers/TestDistributedBarrier.java index f43a943eba..25a56cb383 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/barriers/TestDistributedBarrier.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/barriers/TestDistributedBarrier.java @@ -18,16 +18,15 @@ */ package org.apache.curator.framework.recipes.barriers; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import com.google.common.collect.Lists; -import org.apache.curator.test.BaseClassForTests; -import org.apache.curator.utils.CloseableUtils; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.RetryOneTime; -import org.apache.zookeeper.KeeperException; -import org.junit.jupiter.api.Test; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import java.util.List; import java.util.concurrent.Callable; @@ -36,6 +35,19 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.WatcherRemoveCuratorFramework; +import org.apache.curator.framework.api.ExistsBuilder; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.Lists; + public class TestDistributedBarrier extends BaseClassForTests { @Test @@ -218,4 +230,48 @@ public Object call() throws Exception client.close(); } } + + @Test + public void testIsSet() throws Exception + { + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1))) { + client.start(); + + final DistributedBarrier barrier = new DistributedBarrier(client, "/barrier"); + barrier.setBarrier(); + + assertTrue(barrier.isSet()); + } + } + + @Test + public void testIsNotSet() throws Exception + { + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1))) { + client.start(); + + final DistributedBarrier barrier = new DistributedBarrier(client, "/barrier"); + barrier.setBarrier(); + barrier.removeBarrier(); + + assertFalse(barrier.isSet()); + } + } + + @Test + public void testWatchersRemoved() throws Exception + { + CuratorFramework client = mock(CuratorFramework.class); + WatcherRemoveCuratorFramework watcherRemoveClient = mock(WatcherRemoveCuratorFramework.class); + ExistsBuilder existsBuilder = mock(ExistsBuilder.class); + + when(client.newWatcherRemoveCuratorFramework()).thenReturn(watcherRemoveClient); + when(watcherRemoveClient.checkExists()).thenReturn(existsBuilder); + when(existsBuilder.usingWatcher(any(Watcher.class))).thenReturn(existsBuilder); + + final DistributedBarrier barrier = new DistributedBarrier(client, "/barrier"); + barrier.waitOnBarrier(1, TimeUnit.SECONDS); + verify(watcherRemoveClient, atLeastOnce()).removeWatchers(); + } + }