diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java index 150b3d4fcc..85bfdc0843 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java @@ -18,7 +18,13 @@ */ package org.apache.samza.coordinator; +import org.apache.samza.SamzaException; import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.zk.ZkCoordinationUtils; +import org.apache.samza.zk.ZkJobCoordinatorFactory; + /** * @@ -31,14 +37,17 @@ @InterfaceStability.Evolving public interface CoordinationUtils { - /** - * reset the internal structure. Does not happen automatically with stop() - */ - void reset(); + static CoordinationUtils getCoordinationUtils(String groupId, String participantId, Config config) { + + // currently we figure out if it is ZK based utilities by checking JobCoordinatorConfig.JOB_COORDINATOR_FACTORY + String jobCoordinatorFactoryClassName = config.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ""); + if (!ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) + throw new SamzaException(String.format("Samza supports only ZK based coordination utilities with %s == %s", + JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ZkJobCoordinatorFactory.class.getName())); - // facilities for group coordination - LeaderElector getLeaderElector(); // leaderElector is unique based on the groupId + return new ZkCoordinationUtils(groupId, participantId, config); + } - Latch getLatch(int size, String latchId); + DistributedLock getLock(String lockId); } diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java similarity index 60% rename from samza-core/src/main/java/org/apache/samza/coordinator/CoordinationServiceFactory.java rename to samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java index 497d3e0f00..13c3d0e80e 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationServiceFactory.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java @@ -16,21 +16,29 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.samza.coordinator; -import org.apache.samza.config.Config; +import java.util.concurrent.TimeUnit; -/** - * factory to instantiate a c{@link CoordinationUtils} service - */ -public interface CoordinationServiceFactory { +public interface DistributedLock { + + /** + * Tries to acquire the lock + * @param timeout Duration of lock acquiring timeout. + * @param unit Time Unit of the timeout defined above. + * @return true if lock is acquired successfully, false if it times out. + */ + boolean lock(long timeout, TimeUnit unit); + + /** + * Releases the lock + */ + void unlock(); + /** - * get a unique service instance - * @param groupId - unique id to identify the service - * @param participantId - a unique id that identifies the participant in the service - * @param updatedConfig - configs, to define the details of the service - * @return a unique service instance + * perform any required action for closing */ - CoordinationUtils getCoordinationService(String groupId, String participantId, Config updatedConfig); -} + void close(); +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index b0bfc8a62d..0b113b601c 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -21,23 +21,24 @@ import java.util.HashMap; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; -import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.coordinator.CoordinationUtils; -import org.apache.samza.coordinator.Latch; -import org.apache.samza.coordinator.LeaderElector; +import org.apache.samza.coordinator.DistributedLock; import org.apache.samza.execution.ExecutionPlan; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.processor.StreamProcessor; @@ -46,8 +47,6 @@ import org.apache.samza.task.AsyncStreamTaskFactory; import org.apache.samza.task.StreamTaskFactory; import org.apache.samza.task.TaskFactoryUtil; -import org.apache.samza.zk.ZkCoordinationServiceFactory; -import org.apache.samza.zk.ZkJobCoordinatorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,13 +56,11 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class); - // Latch id that's used for awaiting the init of application before creating the StreamProcessors - private static final String INIT_LATCH_ID = "init"; - // Latch timeout is set to 10 min - private static final long LATCH_TIMEOUT_MINUTES = 10; + // Lock timeout is set to 10 seconds here, as we don't want to introduce a new config value currently. + private static final long LOCK_TIMEOUT_SECONDS = 10; private final String uid; - private final CoordinationUtils coordinationUtils; + private final Set processors = ConcurrentHashMap.newKeySet(); private final CountDownLatch shutdownLatch = new CountDownLatch(1); private final AtomicInteger numProcessorsToStart = new AtomicInteger(); @@ -122,9 +119,6 @@ private void shutdownAndNotify() { } } - if (coordinationUtils != null) { - coordinationUtils.reset(); - } shutdownLatch.countDown(); } } @@ -132,7 +126,6 @@ private void shutdownAndNotify() { public LocalApplicationRunner(Config config) { super(config); uid = UUID.randomUUID().toString(); - coordinationUtils = createCoordinationUtils(); } @Override @@ -207,45 +200,54 @@ public void waitForFinish() { } /** - * Create the {@link CoordinationUtils} needed by the application runner, or null if it's not configured. - * @return an instance of {@link CoordinationUtils} + * Generates a unique lock ID which is consistent for all processors within the same application lifecycle. + * Each {@code StreamSpec} has an ID that is unique and is used for hashcode computation. + * @param intStreams list of {@link StreamSpec}s + * @return lock ID */ - /* package private */ CoordinationUtils createCoordinationUtils() { - String jobCoordinatorFactoryClassName = config.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ""); - - // TODO: we will need a better way to package the configs with application runner - if (ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) { - ApplicationConfig appConfig = new ApplicationConfig(config); - return new ZkCoordinationServiceFactory().getCoordinationService(appConfig.getGlobalAppId(), uid, config); - } else { - return null; - } + private String generateLockId(List intStreams) { + return String.valueOf(Objects.hashCode(intStreams.stream() + .map(StreamSpec::getId) + .collect(Collectors.toList()))); } /** * Create intermediate streams using {@link org.apache.samza.execution.StreamManager}. - * If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader - * will create the streams. All the runner processes will wait on the latch that is released after the leader finishes - * stream creation. + * If {@link CoordinationUtils} is provided, this function will first acquire a lock, and then create the streams. + * All the runner processes will either wait till the time they acquire the lock, or timeout after the specified time. + * After stream creation, they will unlock and proceed normally. * @param intStreams list of intermediate {@link StreamSpec}s - * @throws Exception exception for latch timeout */ - /* package private */ void createStreams(List intStreams) throws Exception { - if (!intStreams.isEmpty()) { - if (coordinationUtils != null) { - Latch initLatch = coordinationUtils.getLatch(1, INIT_LATCH_ID); - LeaderElector leaderElector = coordinationUtils.getLeaderElector(); - leaderElector.setLeaderElectorListener(() -> { - getStreamManager().createStreams(intStreams); - initLatch.countDown(); - }); - leaderElector.tryBecomeLeader(); - initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES); - } else { - // each application process will try creating the streams, which - // requires stream creation to be idempotent - getStreamManager().createStreams(intStreams); + private void createStreams(List intStreams) { + if (intStreams.isEmpty()) { + return; + } + + DistributedLock initLock = null; + try { + CoordinationUtils coordinationUtils = CoordinationUtils.getCoordinationUtils( + new ApplicationConfig(config).getGlobalAppId(), uid, config); + initLock = coordinationUtils.getLock(generateLockId(intStreams)); + } catch (SamzaException e) { + LOG.warn("Coordination utils are not available.", e); + } + + if (initLock != null) { + try { + boolean hasLock = initLock.lock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (hasLock) { + getStreamManager().createStreams(intStreams); + initLock.unlock(); + } else { + LOG.error("Timed out while trying to acquire lock.", new TimeoutException()); + } + } finally { + initLock.close(); } + } else { + // each application process will try creating the streams, which + // requires stream creation to be idempotent + getStreamManager().createStreams(intStreams); } } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java deleted file mode 100644 index 1dd5ec9362..0000000000 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.zk; - -import com.google.common.base.Strings; -import org.I0Itec.zkclient.ZkClient; -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.config.ZkConfig; -import org.apache.samza.coordinator.CoordinationServiceFactory; -import org.apache.samza.coordinator.CoordinationUtils; -import org.apache.samza.util.NoOpMetricsRegistry; -import org.apache.zookeeper.client.ConnectStringParser; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class ZkCoordinationServiceFactory implements CoordinationServiceFactory { - private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class); - - public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) { - ZkConfig zkConfig = new ZkConfig(config); - - ZkClient zkClient = - createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); - - ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs(), new NoOpMetricsRegistry()); - return new ZkCoordinationUtils(participantId, zkConfig, zkUtils); - } - - /** - * helper method to create zkClient - * @param connectString - zkConnect string - * @param sessionTimeoutMS - session timeout - * @param connectionTimeoutMs - connection timeout - * @return zkClient object - */ - public static ZkClient createZkClient(String connectString, int sessionTimeoutMS, int connectionTimeoutMs) { - ZkClient zkClient; - try { - zkClient = new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs); - } catch (Exception e) { - // ZkClient constructor may throw a variety of different exceptions, not all of them Zk based. - throw new SamzaException("zkClient failed to connect to ZK at :" + connectString, e); - } - - // make sure the namespace in zk exists (if specified) - validateZkNameSpace(connectString, zkClient); - - return zkClient; - } - - /** - * if ZkConnectString contains namespace path at the end, but it does not exist we should fail - * @param zkConnect - connect string - * @param zkClient - zkClient object to talk to the ZK - */ - public static void validateZkNameSpace(String zkConnect, ZkClient zkClient) { - ConnectStringParser parser = new ConnectStringParser(zkConnect); - - String path = parser.getChrootPath(); - if (Strings.isNullOrEmpty(path)) { - return; // no namespace path - } - - LOG.info("connectString = " + zkConnect + "; path =" + path); - - // if namespace specified (path above) but "/" does not exists, we will fail - if (!zkClient.exists("/")) { - throw new SamzaException("Zookeeper namespace: " + path + " does not exist for zk at " + zkConnect); - } - } -} diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java index f5dda2e880..1580f3a732 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java @@ -16,52 +16,79 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.samza.zk; -import org.I0Itec.zkclient.exception.ZkInterruptedException; +import com.google.common.base.Strings; +import java.util.concurrent.TimeUnit; +import org.I0Itec.zkclient.ZkClient; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.CoordinationUtils; -import org.apache.samza.coordinator.Latch; -import org.apache.samza.coordinator.LeaderElector; +import org.apache.samza.coordinator.DistributedLock; +import org.apache.samza.util.NoOpMetricsRegistry; +import org.apache.zookeeper.client.ConnectStringParser; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class ZkCoordinationUtils implements CoordinationUtils { - private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinationUtils.class); + private final String participantId; + private final ZkUtils zkUtils; + + private final static Logger LOG = org.slf4j.LoggerFactory.getLogger(ZkCoordinationUtils.class); + + public ZkCoordinationUtils(String groupId, String participantId, Config config) { + + this.participantId = participantId; + ZkConfig zkConfig = new ZkConfig(config); - public final ZkConfig zkConfig; - public final ZkUtils zkUtils; - public final String processorIdStr; + ZkClient zkClient = createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); - public ZkCoordinationUtils(String processorId, ZkConfig zkConfig, ZkUtils zkUtils) { - this.zkConfig = zkConfig; - this.zkUtils = zkUtils; - this.processorIdStr = processorId; + this.zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs(), new NoOpMetricsRegistry()); + LOG.info("Created ZkCoordinationUtils. zkUtils = " + zkUtils + "; groupid=" + groupId); } - @Override - public void reset() { + // + // static auxiliary methods + // + public static ZkClient createZkClient(String connectString, int sessionTimeoutMS, int connectionTimeoutMs) { + ZkClient zkClient; try { - zkUtils.close(); - } catch (ZkInterruptedException ex) { - // Swallowing due to occurrence in the last stage of lifecycle(Not actionable). - LOG.error("Exception in reset: ", ex); + zkClient = new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs); + } catch (Exception e) { + // ZkClient constructor may throw a variety of different exceptions, not all of them Zk based. + throw new SamzaException("zkClient failed to connect to ZK at :" + connectString, e); } - } - @Override - public LeaderElector getLeaderElector() { - return new ZkLeaderElector(processorIdStr, zkUtils); + // make sure the namespace in zk exists (if specified) + validateZkNameSpace(connectString, zkClient); + + return zkClient; } + /** + * if ZkConnectString contains namespace path at the end, but it does not exist we should fail + * @param zkConnect - connect string + * @param zkClient - zkClient object to talk to the ZK + */ + public static void validateZkNameSpace(String zkConnect, ZkClient zkClient) { + ConnectStringParser parser = new ConnectStringParser(zkConnect); - @Override - public Latch getLatch(int size, String latchId) { - return new ZkProcessorLatch(size, latchId, processorIdStr, zkUtils); + String path = parser.getChrootPath(); + if (Strings.isNullOrEmpty(path)) { + return; // no namespace path + } + + //LOG.info("connectString = " + zkConnect + "; path =" + path); + + // if namespace specified (path above) but "/" does not exists, we will fail + if (!zkClient.exists("/")) { + throw new SamzaException("Zookeeper namespace: " + path + " does not exist for zk at " + zkConnect); + } } - // TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer - public ZkUtils getZkUtils() { - return zkUtils; + @Override + public DistributedLock getLock(String initLockId) { + return new ZkDistributedLock(participantId, zkUtils, initLockId); } } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java new file mode 100644 index 0000000000..a29578c306 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.samza.SamzaException; +import org.apache.samza.coordinator.DistributedLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Distributed lock primitive for Zookeeper. + */ +public class ZkDistributedLock implements DistributedLock { + + public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedLock.class); + private final ZkUtils zkUtils; + private final String lockPath; + private final String participantId; + private final ZkKeyBuilder keyBuilder; + private final Random random = new Random(); + private String nodePath = null; + + public ZkDistributedLock(String participantId, ZkUtils zkUtils, String initLockPath) { + this.zkUtils = zkUtils; + this.participantId = participantId; + this.keyBuilder = zkUtils.getKeyBuilder(); + lockPath = String.format("%s/%s", keyBuilder.getRootPath(), initLockPath); + zkUtils.makeSurePersistentPathsExists(new String[] {lockPath}); + } + + /** + * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out. + * Creates a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. + * @param timeout Duration of lock acquiring timeout. + * @param unit Unit of the timeout defined above. + * @return true if lock is acquired successfully, false if it times out. + */ + @Override + public boolean lock(long timeout, TimeUnit unit) { + nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); + + //Start timer for timeout + long startTime = System.currentTimeMillis(); + long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit); + + while ((System.currentTimeMillis() - startTime) < lockTimeout) { + List children = zkUtils.getZkClient().getChildren(lockPath); + int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); + + if (children.size() == 0 || index == -1) { + throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!"); + } + // Acquires lock when the node has the lowest sequence number and returns. + if (index == 0) { + LOG.info("Acquired lock for participant id: {}", participantId); + return true; + } else { + try { + Thread.sleep(random.nextInt(1000)); + } catch (InterruptedException e) { + Thread.interrupted(); + } + LOG.info("Trying to acquire lock again..."); + } + } + return false; + } + + /** + * Unlocks, by deleting the ephemeral sequential node created to acquire the lock. + */ + @Override + public void unlock() { + if (nodePath != null) { + zkUtils.getZkClient().delete(nodePath); + nodePath = null; + LOG.info("Ephemeral lock node deleted. Unlocked!"); + } else { + LOG.warn("Ephemeral lock node you want to delete doesn't exist"); + } + } + + @Override + public void close() { + zkUtils.close(); + } +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 9f64b3aba6..5614309c0c 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -24,8 +24,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import org.I0Itec.zkclient.IZkStateListener; import java.util.Set; +import org.I0Itec.zkclient.IZkStateListener; import org.apache.commons.lang3.StringUtils; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java index 85e3b4ab55..4a8137be1b 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -51,7 +51,8 @@ public JobCoordinator getJobCoordinator(Config config) { private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry) { ZkConfig zkConfig = new ZkConfig(config); ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId()); - ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); + ZkClient zkClient = ZkCoordinationUtils + .createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), metricsRegistry); } } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 5df7114730..2cb09da29c 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -90,8 +90,6 @@ public int getGeneration() { return currentGeneration.get(); } - - public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, MetricsRegistry metricsRegistry) { this.keyBuilder = zkKeyBuilder; this.connectionTimeoutMs = connectionTimeoutMs; diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index a04bd3b06f..57e07d6d12 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -173,6 +173,9 @@ public void resignLeadership() {} public boolean amILeader() { return false; } + + @Override + public void close() { } }; Latch latch = new Latch() { @@ -188,10 +191,13 @@ public void await(long timeout, TimeUnit tu) public void countDown() { done = true; } + + @Override + public void close() { } }; when(coordinationUtils.getLeaderElector()).thenReturn(leaderElector); when(coordinationUtils.getLatch(anyInt(), anyString())).thenReturn(latch); - doReturn(coordinationUtils).when(spy).createCoordinationUtils(); + //doReturn(coordinationUtils).when(spy).createCoordinationUtils(); try { spy.run(app); diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index b5953d1a5f..93f352286c 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -121,7 +121,8 @@ public void testGetProcessorsIDs() { zkUtils.registerProcessorAndGetId(new ProcessorData("host1", "1")); List l = zkUtils.getSortedActiveProcessorsIDs(); Assert.assertEquals(1, l.size()); - new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId(new ProcessorData("host2", "2")); + new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()). + registerProcessorAndGetId(new ProcessorData("host2", "2")); l = zkUtils.getSortedActiveProcessorsIDs(); Assert.assertEquals(2, l.size());