diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java index 150b3d4fcc..1e4acd79d0 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java @@ -41,4 +41,6 @@ public interface CoordinationUtils { LeaderElector getLeaderElector(); // leaderElector is unique based on the groupId Latch getLatch(int size, String latchId); + + DistributedLock getLock(String initLockPath); } diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java new file mode 100644 index 0000000000..6972cd95ba --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator; + +import java.util.concurrent.TimeUnit; + + +public interface DistributedLock { + + /** + * Tries to acquire the lock + * @param timeout Duration of lock acquiring timeout. + * @param unit Time Unit of the timeout defined above. + * @return true if lock is acquired successfully, false if it times out. + */ + boolean lock(long timeout, TimeUnit unit); + + /** + * Releases the lock + */ + void unlock(); +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 588e657f0d..71b76bd37d 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -36,9 +36,9 @@ import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.TaskConfig; +import org.apache.samza.coordinator.CoordinationServiceFactory; import org.apache.samza.coordinator.CoordinationUtils; -import org.apache.samza.coordinator.Latch; -import org.apache.samza.coordinator.LeaderElector; +import org.apache.samza.coordinator.DistributedLock; import org.apache.samza.execution.ExecutionPlan; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.processor.StreamProcessor; @@ -47,8 +47,7 @@ import org.apache.samza.task.AsyncStreamTaskFactory; import org.apache.samza.task.StreamTaskFactory; import org.apache.samza.task.TaskFactoryUtil; -import org.apache.samza.zk.ZkCoordinationServiceFactory; -import org.apache.samza.zk.ZkJobCoordinatorFactory; +import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,9 +58,10 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class); private static final String APPLICATION_RUNNER_ZK_PATH_SUFFIX = "/ApplicationRunnerData"; - // Latch timeout is set to 10 min - private static final long LATCH_TIMEOUT_MINUTES = 10; - private static final long LEADER_ELECTION_WAIT_TIME_MS = 1000; + // Lock timeout is set to 10 seconds here, as we don't want to introduce a new config value currently. + private static final long LOCK_TIMEOUT_SECONDS = 10; + private static final String ZK_COORDINATION_CLASS = "org.apache.samza.zk.ZkJobCoordinatorFactory"; + private static final String ZK_COORDINATION_SERVICE_CLASS = "org.apache.samza.zk.ZkCoordinationServiceFactory"; private final String uid; private final Set processors = ConcurrentHashMap.newKeySet(); @@ -214,9 +214,9 @@ public void waitForFinish() { String jobCoordinatorFactoryClassName = config.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ""); // TODO: we will need a better way to package the configs with application runner - if (ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) { + if (ZK_COORDINATION_CLASS.equals(jobCoordinatorFactoryClassName)) { ApplicationConfig appConfig = new ApplicationConfig(config); - return new ZkCoordinationServiceFactory().getCoordinationService( + return Util.getObj(ZK_COORDINATION_SERVICE_CLASS).getCoordinationService( appConfig.getGlobalAppId() + APPLICATION_RUNNER_ZK_PATH_SUFFIX, uid, config); } else { return null; @@ -225,31 +225,28 @@ public void waitForFinish() { /** * Create intermediate streams using {@link org.apache.samza.execution.StreamManager}. - * If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader - * will create the streams. All the runner processes will wait on the latch that is released after the leader finishes - * stream creation. + * If {@link CoordinationUtils} is provided, this function will first acquire a lock, and then create the streams. + * All the runner processes will either wait till the time they acquire the lock, or timeout after the specified time. + * After stream creation, they will unlock and proceed normally. * @param planId a unique identifier representing the plan used for coordination purpose * @param intStreams list of intermediate {@link StreamSpec}s - * @throws TimeoutException exception for latch timeout + * @throws TimeoutException exception for lock timeout */ - /* package private */ void createStreams(String planId, List intStreams) throws TimeoutException { + /* package private */ void createStreams(String planId, List intStreams) throws TimeoutException { if (!intStreams.isEmpty()) { // Move the scope of coordination utils within stream creation to address long idle connection problem. // Refer SAMZA-1385 for more details CoordinationUtils coordinationUtils = createCoordinationUtils(); if (coordinationUtils != null) { - Latch initLatch = coordinationUtils.getLatch(1, planId); - + DistributedLock initLock = coordinationUtils.getLock(planId); try { - // check if the processor needs to go through leader election and stream creation - if (shouldContestInElectionForStreamCreation(initLatch)) { - LeaderElector leaderElector = coordinationUtils.getLeaderElector(); - leaderElector.setLeaderElectorListener(() -> { - getStreamManager().createStreams(intStreams); - initLatch.countDown(); - }); - leaderElector.tryBecomeLeader(); - initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES); + boolean hasLock = initLock.lock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (hasLock) { + getStreamManager().createStreams(intStreams); + LOG.info("Created intermediate streams. Unlocking now..."); + initLock.unlock(); + } else { + LOG.error("Timed out while trying to acquire lock.", new TimeoutException()); } } finally { coordinationUtils.reset(); @@ -285,31 +282,4 @@ StreamProcessor createStreamProcessor( taskFactory.getClass().getCanonicalName())); } } - - /** - * In order to fix SAMZA-1385, we are limiting the scope of coordination util within stream creation phase and destroying - * the coordination util right after. By closing the zk connection, we clean up the ephemeral node used for leader election. - * It creates the following issues whenever a new process joins after the ephemeral node is gone. - * 1. It is unnecessary to re-conduct leader election for stream creation in the same application lifecycle - * 2. Underlying systems may not support check for stream existence prior to creation which could have potential problems. - * As in interim solution, we reuse the same latch as a marker to determine if create streams phase is done for the - * application lifecycle using {@link Latch#await(long, TimeUnit)} - * - * @param streamCreationLatch latch used for stream creation - * @return true if processor needs to be part of election - * false otherwise - */ - private boolean shouldContestInElectionForStreamCreation(Latch streamCreationLatch) { - boolean eligibleForElection = true; - - try { - streamCreationLatch.await(LEADER_ELECTION_WAIT_TIME_MS, TimeUnit.MILLISECONDS); - // case we didn't time out suggesting that latch already exists - eligibleForElection = false; - } catch (TimeoutException e) { - LOG.info("Timed out waiting for the latch! Going to enter leader election section to create streams"); - } - - return eligibleForElection; - } -} +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java index f5dda2e880..4b50d80b34 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java @@ -23,6 +23,7 @@ import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.Latch; import org.apache.samza.coordinator.LeaderElector; +import org.apache.samza.coordinator.DistributedLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +61,11 @@ public Latch getLatch(int size, String latchId) { return new ZkProcessorLatch(size, latchId, processorIdStr, zkUtils); } + @Override + public DistributedLock getLock(String initLockPath) { + return new ZkLock(processorIdStr, zkUtils, initLockPath); + } + // TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer public ZkUtils getZkUtils() { return zkUtils; diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java new file mode 100644 index 0000000000..b8ceda490d --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.samza.SamzaException; +import org.apache.samza.coordinator.DistributedLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Distributed lock primitive for Zookeeper. + */ +public class ZkLock implements DistributedLock { + + public static final Logger LOG = LoggerFactory.getLogger(ZkLock.class); + private final ZkUtils zkUtils; + private final String lockPath; + private final String participantId; + private final ZkKeyBuilder keyBuilder; + private final Random random = new Random(); + private String nodePath = null; + + public ZkLock(String participantId, ZkUtils zkUtils, String initLockPath) { + this.zkUtils = zkUtils; + this.participantId = participantId; + this.keyBuilder = this.zkUtils.getKeyBuilder(); + lockPath = String.format("%s/%s", keyBuilder.getRootPath(), initLockPath); + zkUtils.makeSurePersistentPathsExists(new String[] {lockPath}); + } + + /** + * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out. + * Creates a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. + * @param timeout Duration of lock acquiring timeout. + * @param unit Unit of the timeout defined above. + * @return true if lock is acquired successfully, false if it times out. + */ + @Override + public boolean lock(long timeout, TimeUnit unit) { + nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); + + //Start timer for timeout + long startTime = System.currentTimeMillis(); + long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit); + + while ((System.currentTimeMillis() - startTime) < lockTimeout) { + List children = zkUtils.getZkClient().getChildren(lockPath); + int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); + + if (children.size() == 0 || index == -1) { + throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!"); + } + // Acquires lock when the node has the lowest sequence number and returns. + if (index == 0) { + LOG.info("Acquired lock for participant id: {}", participantId); + return true; + } else { + try { + Thread.sleep(random.nextInt(1000)); + } catch (InterruptedException e) { + Thread.interrupted(); + } + LOG.info("Trying to acquire lock again..."); + } + } + return false; + } + + /** + * Unlocks, by deleting the ephemeral sequential node created to acquire the lock. + */ + @Override + public void unlock() { + if (nodePath != null) { + zkUtils.getZkClient().delete(nodePath); + nodePath = null; + LOG.info("Ephemeral lock node deleted. Unlocked!"); + } else { + LOG.warn("Ephemeral lock node you want to delete doesn't exist"); + } + } +} \ No newline at end of file diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index 6c7827e9ac..b84d4fce8e 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; @@ -34,9 +33,7 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.coordinator.CoordinationUtils; -import org.apache.samza.coordinator.Latch; -import org.apache.samza.coordinator.LeaderElector; -import org.apache.samza.coordinator.LeaderElectorListener; +import org.apache.samza.coordinator.DistributedLock; import org.apache.samza.execution.ExecutionPlan; import org.apache.samza.execution.ExecutionPlanner; import org.apache.samza.execution.StreamManager; @@ -48,7 +45,6 @@ import org.mockito.ArgumentCaptor; import static org.junit.Assert.*; -import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.*; @@ -162,46 +158,18 @@ public String getPlanAsJson() LocalApplicationRunner spy = spy(runner); CoordinationUtils coordinationUtils = mock(CoordinationUtils.class); - LeaderElector leaderElector = new LeaderElector() { - private LeaderElectorListener leaderElectorListener; + DistributedLock lock = new DistributedLock() { @Override - public void setLeaderElectorListener(LeaderElectorListener listener) { - this.leaderElectorListener = listener; + public boolean lock(long timeout, TimeUnit unit) { + return true; } @Override - public void tryBecomeLeader() { - leaderElectorListener.onBecomingLeader(); - } - - @Override - public void resignLeadership() {} - - @Override - public boolean amILeader() { - return false; - } + public void unlock() {} }; - Latch latch = new Latch() { - boolean done = false; - @Override - public void await(long timeout, TimeUnit tu) - throws TimeoutException { - // in this test, latch is released after countDown is invoked - if (!done) { - throw new TimeoutException("timed out waiting for the target path"); - } - } - - @Override - public void countDown() { - done = true; - } - }; - when(coordinationUtils.getLeaderElector()).thenReturn(leaderElector); - when(coordinationUtils.getLatch(anyInt(), anyString())).thenReturn(latch); + when(coordinationUtils.getLock(anyString())).thenReturn(lock); doReturn(coordinationUtils).when(spy).createCoordinationUtils(); try { @@ -361,7 +329,7 @@ public String getPlanAsJson() @Test public void testPlanIdWithShuffledStreamSpecs() { List streamSpecs = ImmutableList.of( - new StreamSpec("test-stream-1", "stream-1", "testStream"), + new StreamSpec("test-stream-1", "stream-1", "testStream"), new StreamSpec("test-stream-2", "stream-2", "testStream"), new StreamSpec("test-stream-3", "stream-3", "testStream")); String planIdBeforeShuffle = getExecutionPlanId(streamSpecs); @@ -430,4 +398,4 @@ private String streamSpecToJson(StreamSpec streamSpec) { streamSpec.getSystemName(), streamSpec.getPhysicalName()); } -} +} \ No newline at end of file diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 1e59b61e3c..b8d204b1cf 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -518,7 +518,7 @@ class KafkaSystemAdmin( */ override def validateChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = { validateStream(new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions)) - } + } /** * Compare the two offsets. Returns x where x < 0 if offset1 < offset2;