-
Notifications
You must be signed in to change notification settings - Fork 331
SAMZA-1372: Change Latch Interface to Lock Interface for Samza Standalone with ZK #264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8fb1ef6
8ec4e59
9750f2d
83a9644
afbddac
03f1bc7
774903a
80503f8
23849c8
5cf9d37
6a790b2
ad94c91
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.samza.coordinator; | ||
|
|
||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
|
|
||
| public interface DistributedLock { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be nice to add javadoc for public interface classes. |
||
|
|
||
| /** | ||
| * Tries to acquire the lock | ||
| * @param timeout Duration of lock acquiring timeout. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: timeout to acquire a lock? Or duration of lock after acquired? I think that you were referring to the first. It would be better to clearly state it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is the timeout to acquire the lock. Will make it more clear in the documentation. |
||
| * @param unit Time Unit of the timeout defined above. | ||
| * @return true if lock is acquired successfully, false if it times out. | ||
| */ | ||
| boolean lock(long timeout, TimeUnit unit); | ||
|
|
||
| /** | ||
| * Releases the lock | ||
| */ | ||
| void unlock(); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be helpful to have a close() method in the lock to be able to close the connection (separately from the unlock)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sborya I think that we should wait a bit to see some more distributed lock implementation. If all distributed lock implementation would require init() to open a distributed resource and close() to close a distributed resource, we should add both APIs to this interface. If not, then init() and close() would be per implementation.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When there’s a failure in unlock, a client of DistributedLock will not be able to distinguish if it’s because of releasing resources or due to unlock implementation. A lock owner wanting to release a lock to other processor in group, will not be able to conclude effectively if unlock has happened or not in case of exceptions(Due to two possibilities and will not be able to handle exception effectively). Also, logically with just lock, unlock operations a client can call lock() after unlock(), essentially trying to join again which wouldn’t be possible if resource establishment happens in constructor/factory.create method(instead of separate init() lifecycle method). Since any DistributedLock implementation will require resource establishment with a RemoteService(Zookeeper, AzureBlobStore, any), extending Autocloseable and having init() lifecycle methods, we would be clearly defining boundaries and communicating that intent with our interface(Similar to I’m not suggesting that we should do it, just interested to know your opinion. Can you please point out why this reasoning is incorrect(not required now). Please share your thoughts.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @shanthoosh please see my comments on the lock interface definition. You point here is exactly what I tried to say in the interface definition. You can have the lock itself maintain a heart-beat/life-cycle/lease to the remote resource, which may not require init()/close() in that case. However, it seems necessary to define a callback to the lock interface s.t. user can react to loss of possession of remote resource. |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,9 +36,9 @@ | |
| import org.apache.samza.config.JobConfig; | ||
| import org.apache.samza.config.JobCoordinatorConfig; | ||
| import org.apache.samza.config.TaskConfig; | ||
| import org.apache.samza.coordinator.CoordinationServiceFactory; | ||
| import org.apache.samza.coordinator.CoordinationUtils; | ||
| import org.apache.samza.coordinator.Latch; | ||
| import org.apache.samza.coordinator.LeaderElector; | ||
| import org.apache.samza.coordinator.DistributedLock; | ||
| import org.apache.samza.execution.ExecutionPlan; | ||
| import org.apache.samza.job.ApplicationStatus; | ||
| import org.apache.samza.processor.StreamProcessor; | ||
|
|
@@ -47,8 +47,7 @@ | |
| import org.apache.samza.task.AsyncStreamTaskFactory; | ||
| import org.apache.samza.task.StreamTaskFactory; | ||
| import org.apache.samza.task.TaskFactoryUtil; | ||
| import org.apache.samza.zk.ZkCoordinationServiceFactory; | ||
| import org.apache.samza.zk.ZkJobCoordinatorFactory; | ||
| import org.apache.samza.util.Util; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
@@ -59,9 +58,10 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { | |
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class); | ||
| private static final String APPLICATION_RUNNER_ZK_PATH_SUFFIX = "/ApplicationRunnerData"; | ||
| // Latch timeout is set to 10 min | ||
| private static final long LATCH_TIMEOUT_MINUTES = 10; | ||
| private static final long LEADER_ELECTION_WAIT_TIME_MS = 1000; | ||
| // Lock timeout is set to 10 seconds here, as we don't want to introduce a new config value currently. | ||
| private static final long LOCK_TIMEOUT_SECONDS = 10; | ||
| private static final String ZK_COORDINATION_CLASS = "org.apache.samza.zk.ZkJobCoordinatorFactory"; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should avoid more and more ZK-specific names/details to bleed into the general implementation. LocalApplicationRunner is not just for ZK-based implementation. Hence, I would recommend not to hard-code any actual ZK implementation classes here in the LocalApplicationRunner.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nickpan47 I agree. but we have to do this string matching because we decided that the user will not configure a
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @navina I am OK if the config has a default, which is ZK-specific. I am against the hard-code ZK-specific class names in the LocalApplicationRunner implementation. I would suggest that in the LocalApplicationRunner, we load the CoordinatorServiceFactory via Utils.getInstance(JobConfig.getJobCoordinatorFactory()), w/o hard-code the ZK job coordinator class names in the LocalApplicationRunner. |
||
| private static final String ZK_COORDINATION_SERVICE_CLASS = "org.apache.samza.zk.ZkCoordinationServiceFactory"; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. We should only use the general CoordinatorServiceFactory interface in LocalApplicationRunner, not the ZK-specific implementation. |
||
|
|
||
| private final String uid; | ||
| private final Set<StreamProcessor> processors = ConcurrentHashMap.newKeySet(); | ||
|
|
@@ -214,9 +214,9 @@ public void waitForFinish() { | |
| String jobCoordinatorFactoryClassName = config.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ""); | ||
|
|
||
| // TODO: we will need a better way to package the configs with application runner | ||
| if (ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) { | ||
| if (ZK_COORDINATION_CLASS.equals(jobCoordinatorFactoryClassName)) { | ||
| ApplicationConfig appConfig = new ApplicationConfig(config); | ||
| return new ZkCoordinationServiceFactory().getCoordinationService( | ||
| return Util.<CoordinationServiceFactory>getObj(ZK_COORDINATION_SERVICE_CLASS).getCoordinationService( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove the ZK-specific code here. |
||
| appConfig.getGlobalAppId() + APPLICATION_RUNNER_ZK_PATH_SUFFIX, uid, config); | ||
| } else { | ||
| return null; | ||
|
|
@@ -225,31 +225,28 @@ public void waitForFinish() { | |
|
|
||
| /** | ||
| * Create intermediate streams using {@link org.apache.samza.execution.StreamManager}. | ||
| * If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader | ||
| * will create the streams. All the runner processes will wait on the latch that is released after the leader finishes | ||
| * stream creation. | ||
| * If {@link CoordinationUtils} is provided, this function will first acquire a lock, and then create the streams. | ||
| * All the runner processes will either wait till the time they acquire the lock, or timeout after the specified time. | ||
| * After stream creation, they will unlock and proceed normally. | ||
| * @param planId a unique identifier representing the plan used for coordination purpose | ||
| * @param intStreams list of intermediate {@link StreamSpec}s | ||
| * @throws TimeoutException exception for latch timeout | ||
| * @throws TimeoutException exception for lock timeout | ||
| */ | ||
| /* package private */ void createStreams(String planId, List<StreamSpec> intStreams) throws TimeoutException { | ||
| /* package private */ void createStreams(String planId, List<StreamSpec> intStreams) throws TimeoutException { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: unnecessary extra leading white space. |
||
| if (!intStreams.isEmpty()) { | ||
| // Move the scope of coordination utils within stream creation to address long idle connection problem. | ||
| // Refer SAMZA-1385 for more details | ||
| CoordinationUtils coordinationUtils = createCoordinationUtils(); | ||
| if (coordinationUtils != null) { | ||
| Latch initLatch = coordinationUtils.getLatch(1, planId); | ||
|
|
||
| DistributedLock initLock = coordinationUtils.getLock(planId); | ||
| try { | ||
| // check if the processor needs to go through leader election and stream creation | ||
| if (shouldContestInElectionForStreamCreation(initLatch)) { | ||
| LeaderElector leaderElector = coordinationUtils.getLeaderElector(); | ||
| leaderElector.setLeaderElectorListener(() -> { | ||
| getStreamManager().createStreams(intStreams); | ||
| initLatch.countDown(); | ||
| }); | ||
| leaderElector.tryBecomeLeader(); | ||
| initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES); | ||
| boolean hasLock = initLock.lock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS); | ||
| if (hasLock) { | ||
| getStreamManager().createStreams(intStreams); | ||
| LOG.info("Created intermediate streams. Unlocking now..."); | ||
| initLock.unlock(); | ||
| } else { | ||
| LOG.error("Timed out while trying to acquire lock.", new TimeoutException()); | ||
| } | ||
| } finally { | ||
| coordinationUtils.reset(); | ||
|
|
@@ -285,31 +282,4 @@ StreamProcessor createStreamProcessor( | |
| taskFactory.getClass().getCanonicalName())); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * In order to fix SAMZA-1385, we are limiting the scope of coordination util within stream creation phase and destroying | ||
| * the coordination util right after. By closing the zk connection, we clean up the ephemeral node used for leader election. | ||
| * It creates the following issues whenever a new process joins after the ephemeral node is gone. | ||
| * 1. It is unnecessary to re-conduct leader election for stream creation in the same application lifecycle | ||
| * 2. Underlying systems may not support check for stream existence prior to creation which could have potential problems. | ||
| * As in interim solution, we reuse the same latch as a marker to determine if create streams phase is done for the | ||
| * application lifecycle using {@link Latch#await(long, TimeUnit)} | ||
| * | ||
| * @param streamCreationLatch latch used for stream creation | ||
| * @return true if processor needs to be part of election | ||
| * false otherwise | ||
| */ | ||
| private boolean shouldContestInElectionForStreamCreation(Latch streamCreationLatch) { | ||
| boolean eligibleForElection = true; | ||
|
|
||
| try { | ||
| streamCreationLatch.await(LEADER_ELECTION_WAIT_TIME_MS, TimeUnit.MILLISECONDS); | ||
| // case we didn't time out suggesting that latch already exists | ||
| eligibleForElection = false; | ||
| } catch (TimeoutException e) { | ||
| LOG.info("Timed out waiting for the latch! Going to enter leader election section to create streams"); | ||
| } | ||
|
|
||
| return eligibleForElection; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.samza.zk; | ||
|
|
||
| import java.util.List; | ||
| import java.util.Random; | ||
| import java.util.concurrent.TimeUnit; | ||
| import org.apache.samza.SamzaException; | ||
| import org.apache.samza.coordinator.DistributedLock; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
||
| /** | ||
| * Distributed lock primitive for Zookeeper. | ||
| */ | ||
| public class ZkLock implements DistributedLock { | ||
|
|
||
| public static final Logger LOG = LoggerFactory.getLogger(ZkLock.class); | ||
| private final ZkUtils zkUtils; | ||
| private final String lockPath; | ||
| private final String participantId; | ||
| private final ZkKeyBuilder keyBuilder; | ||
| private final Random random = new Random(); | ||
| private String nodePath = null; | ||
|
|
||
| public ZkLock(String participantId, ZkUtils zkUtils, String initLockPath) { | ||
| this.zkUtils = zkUtils; | ||
| this.participantId = participantId; | ||
| this.keyBuilder = this.zkUtils.getKeyBuilder(); | ||
| lockPath = String.format("%s/%s", keyBuilder.getRootPath(), initLockPath); | ||
| zkUtils.makeSurePersistentPathsExists(new String[] {lockPath}); | ||
| } | ||
|
|
||
| /** | ||
| * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out. | ||
| * Creates a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. | ||
| * @param timeout Duration of lock acquiring timeout. | ||
| * @param unit Unit of the timeout defined above. | ||
| * @return true if lock is acquired successfully, false if it times out. | ||
| */ | ||
| @Override | ||
| public boolean lock(long timeout, TimeUnit unit) { | ||
| nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); | ||
|
|
||
| //Start timer for timeout | ||
| long startTime = System.currentTimeMillis(); | ||
| long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit); | ||
|
|
||
| while ((System.currentTimeMillis() - startTime) < lockTimeout) { | ||
| List<String> children = zkUtils.getZkClient().getChildren(lockPath); | ||
| int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); | ||
|
|
||
| if (children.size() == 0 || index == -1) { | ||
| throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see any error handling logic here (e.g. try to re-connect within the session expiration, etc.). Is it on the contract to the caller of DistributedLock.lock() to handle those exceptions? If yes, it would be necessary to define the exception types the DistributedLock.lock() method would throw and clearly state in the document what the caller supposed to do when catch those exceptions. Otherwise, the implementer of lock() should just handle the specific exceptions internally (i.e. re-try) until timeout. |
||
| } | ||
| // Acquires lock when the node has the lowest sequence number and returns. | ||
| if (index == 0) { | ||
| LOG.info("Acquired lock for participant id: {}", participantId); | ||
| return true; | ||
| } else { | ||
| try { | ||
| Thread.sleep(random.nextInt(1000)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought that in ZK implementation, we can use watch() on children? |
||
| } catch (InterruptedException e) { | ||
| Thread.interrupted(); | ||
| } | ||
| LOG.info("Trying to acquire lock again..."); | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| /** | ||
| * Unlocks, by deleting the ephemeral sequential node created to acquire the lock. | ||
| */ | ||
| @Override | ||
| public void unlock() { | ||
| if (nodePath != null) { | ||
| zkUtils.getZkClient().delete(nodePath); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, I would recommend to think it through about the zkClient lifecycle needed in this lock implementation. Are we assuming that the zkClient is initialized by some other class and just passed in to this lock implementation? Or are we assuming that the lock would internally own the lifecycle of the zkClient? If we are assuming the first case, I think that this DistributedLock implementation lack of handling on the failure cases: what if the zkClient connection to the ZK server was dropped after the user has called lock() and proceeded into the critical section? If the disconnect time is longer than the session timeout, there would be race conditions. How do the users of the DistributedLock being notified of this issue after calling the lock()? |
||
| nodePath = null; | ||
| LOG.info("Ephemeral lock node deleted. Unlocked!"); | ||
| } else { | ||
| LOG.warn("Ephemeral lock node you want to delete doesn't exist"); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A general comment: in a broader context, lockName sounds better than lockPath. lockPath is more ZK-specific.