Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.curator.framework.recipes.barriers;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
Expand All @@ -37,7 +38,7 @@
*/
public class DistributedBarrier
{
private final CuratorFramework client;
private final WatcherRemoveCuratorFramework client;
private final String barrierPath;
private final Watcher watcher = new Watcher()
{
Expand All @@ -54,7 +55,7 @@ public void process(WatchedEvent event)
*/
public DistributedBarrier(CuratorFramework client, String barrierPath)
{
this.client = client;
this.client = client.newWatcherRemoveCuratorFramework();
this.barrierPath = PathUtils.validatePath(barrierPath);
}

Expand Down Expand Up @@ -92,6 +93,16 @@ public synchronized void removeBarrier() throws Exception
}
}

/**
* Indicates if the barrier is set or not.
*
* @return true if the barrier is set.
*/
public synchronized boolean isSet() throws Exception
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotated with @VisibleForTesting.

{
return client.checkExists().forPath(barrierPath) != null;
}

/**
* Blocks until the barrier node comes into existence
*
Expand All @@ -117,29 +128,36 @@ public synchronized boolean waitOnBarrier(long maxWait, TimeUnit unit) thro
long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;

boolean result;
for(;;)
try
{
result = (client.checkExists().usingWatcher(watcher).forPath(barrierPath) == null);
if ( result )
{
break;
}

if ( hasMaxWait )
for(;;)
{
long elapsed = System.currentTimeMillis() - startMs;
long thisWaitMs = maxWaitMs - elapsed;
if ( thisWaitMs <= 0 )
result = (client.checkExists().usingWatcher(watcher).forPath(barrierPath) == null);
if ( result )
{
break;
}
wait(thisWaitMs);
}
else
{
wait();

if ( hasMaxWait )
{
long elapsed = System.currentTimeMillis() - startMs;
long thisWaitMs = maxWaitMs - elapsed;
if ( thisWaitMs <= 0 )
{
break;
}
wait(thisWaitMs);
}
else
{
wait();
}
}
}
finally
{
client.removeWatchers();
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
*/
package org.apache.curator.framework.recipes.barriers;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import com.google.common.collect.Lists;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.zookeeper.KeeperException;
import org.junit.jupiter.api.Test;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;


import java.util.List;
import java.util.concurrent.Callable;
Expand All @@ -36,6 +35,19 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.ExistsBuilder;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.junit.jupiter.api.Test;

import com.google.common.collect.Lists;

public class TestDistributedBarrier extends BaseClassForTests
{
@Test
Expand Down Expand Up @@ -218,4 +230,48 @@ public Object call() throws Exception
client.close();
}
}

@Test
public void testIsSet() throws Exception
{
try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1))) {
client.start();

final DistributedBarrier barrier = new DistributedBarrier(client, "/barrier");
barrier.setBarrier();

assertTrue(barrier.isSet());
}
}

@Test
public void testIsNotSet() throws Exception
{
try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1))) {
client.start();

final DistributedBarrier barrier = new DistributedBarrier(client, "/barrier");
barrier.setBarrier();
barrier.removeBarrier();

assertFalse(barrier.isSet());
}
}

@Test
public void testWatchersRemoved() throws Exception
{
CuratorFramework client = mock(CuratorFramework.class);
WatcherRemoveCuratorFramework watcherRemoveClient = mock(WatcherRemoveCuratorFramework.class);
ExistsBuilder existsBuilder = mock(ExistsBuilder.class);

when(client.newWatcherRemoveCuratorFramework()).thenReturn(watcherRemoveClient);
when(watcherRemoveClient.checkExists()).thenReturn(existsBuilder);
when(existsBuilder.usingWatcher(any(Watcher.class))).thenReturn(existsBuilder);

final DistributedBarrier barrier = new DistributedBarrier(client, "/barrier");
barrier.waitOnBarrier(1, TimeUnit.SECONDS);
verify(watcherRemoveClient, atLeastOnce()).removeWatchers();
}
Comment on lines +262 to +275
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add watches and verify the following events triggered:

  • DataWatchRemoved
  • ChildWatchRemoved
  • PersistentWatchRemoved

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it might be particular important to test this as ZooKeeper.removeWatches is cheating since ZOOKEEPER-1910. See also kezhuw/zookeeper-client-rust#2.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kezhuw I agree that it can be a bug as I mentioned in https://lists.apache.org/thread/0kcnklcxs0s5656c1sbh3crgdodbb0qg. You can reply on the mailing list and file an issue.


}