From 8fb1ef61f2944994358f1e39dd2a423f3bd5d6b4 Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Tue, 8 Aug 2017 10:33:26 -0700 Subject: [PATCH 01/12] Add lock --- .../org/apache/samza/coordinator/Lock.java | 38 +++++ .../samza/coordinator/LockListener.java | 30 ++++ .../main/java/org/apache/samza/zk/ZkLock.java | 152 ++++++++++++++++++ .../java/org/apache/samza/zk/TestZkLock.java | 23 +++ 4 files changed, 243 insertions(+) create mode 100644 samza-core/src/main/java/org/apache/samza/coordinator/Lock.java create mode 100644 samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java create mode 100644 samza-core/src/main/java/org/apache/samza/zk/ZkLock.java create mode 100644 samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java b/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java new file mode 100644 index 0000000000..27e8437775 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator; + +public interface Lock { + + /** + * Acquires the lock + */ + void lock(); + + /** + * Releases the lock + */ + void unlock(); + + boolean hasLock(); + + void setLockListener(LockListener listener); + +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java b/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java new file mode 100644 index 0000000000..f80da793e6 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +public interface LockListener { + + public void onAcquiringLock(); + + /** + * Perform the necessary operation when a notification about acquire lock/release lock/error has been received. + */ + public void onError(); + +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java new file mode 100644 index 0000000000..d34f7bf8fd --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import org.I0Itec.zkclient.IZkDataListener; +import org.apache.samza.SamzaException; +import org.apache.samza.coordinator.Lock; +import org.apache.samza.coordinator.LockListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ZkLock implements Lock { + + public static final Logger LOG = LoggerFactory.getLogger(ZkLeaderElector.class); + private final ZkUtils zkUtils; + private final String lockPath; + private final String participantId; + private String currentSubscription = null; + private final ZkKeyBuilder keyBuilder; + private final IZkDataListener previousProcessorChangeListener; + private final Random random = new Random(); + private String nodePath = null; + private LockListener zkLockListener = null; + private AtomicBoolean hasLock; + private final static String LOCK_PATH = "lock"; + + public ZkLock(String participantId, ZkUtils zkUtils) { + this.zkUtils = zkUtils; + this.participantId = participantId; + this.keyBuilder = this.zkUtils.getKeyBuilder(); + this.previousProcessorChangeListener = new PreviousLockProcessorChangeListener(); + lockPath = String.format("%s/%s", keyBuilder.getRootPath(), LOCK_PATH); + zkUtils.makeSurePersistentPathsExists(new String[] {lockPath}); + this.hasLock = new AtomicBoolean(false); + } + + /** + * Create a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. + */ + @Override + public void lock() { + try { + nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); + } catch (Exception e) { + zkLockListener.onError(); + } + List children = zkUtils.getZkClient().getChildren(lockPath); + int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); + + if (children.size() == 0 || index == -1) { + throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!"); + } + + if (index == 0) { + hasLock.set(true); + if (zkLockListener != null) { + zkLockListener.onAcquiringLock(); + } + } else { + String predecessor = children.get(index - 1); + if (!predecessor.equals(currentSubscription)) { + if (currentSubscription != null) { + zkUtils.unsubscribeDataChanges(lockPath + "/" + currentSubscription, + previousProcessorChangeListener); + } + currentSubscription = predecessor; + zkUtils.subscribeDataChanges(lockPath + "/" + currentSubscription, + previousProcessorChangeListener); + } + /** + * Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes + * on the path, even if the path doesn't exist. Since we are using Ephemeral Sequential nodes, if the path doesn't + * exist during subscription, it is not going to get created in the future. + */ + boolean predecessorExists = zkUtils.exists(lockPath + "/" + currentSubscription); + if (predecessorExists) { + LOG.info("Predecessor still exists. Current subscription is valid. Continuing as non-lockholder."); + } else { + try { + Thread.sleep(random.nextInt(1000)); + } catch (InterruptedException e) { + Thread.interrupted(); + } + LOG.info("Predecessor doesn't exist anymore. Trying to acquire lock again..."); + lock(); + } + } + } + + + /** + * Delete the node created to acquire the lock + */ + @Override + public void unlock() { + if (nodePath != null) { + Boolean status = false; + while (!status && nodePath != null) { + status = zkUtils.getZkClient().delete(nodePath); + } + hasLock.set(false); + nodePath = null; + LOG.info("Ephemeral lock node deleted. Unlocked!"); + } else { + LOG.info("Ephemeral lock node doesn't exist"); + } + } + + @Override + public boolean hasLock() { + return hasLock.get(); + } + + @Override + public void setLockListener(LockListener listener) { + this.zkLockListener = listener; + } + + class PreviousLockProcessorChangeListener implements IZkDataListener { + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + LOG.debug("Data change on path: " + dataPath + " Data: " + data); + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + LOG.info("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again..."); + lock(); + } + } + +} \ No newline at end of file diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java new file mode 100644 index 0000000000..50d55702a6 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.zk; + +public class TestZkLock { +} From 8ec4e5982d1045b1090e8e3d762e5f402b4ecaca Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Wed, 9 Aug 2017 11:54:08 -0700 Subject: [PATCH 02/12] Add create streams with lock --- .../org/apache/samza/system/SystemAdmin.java | 9 ++++++++ .../samza/coordinator/CoordinationUtils.java | 2 ++ .../apache/samza/execution/StreamManager.java | 15 ++++++++++++ .../samza/runtime/LocalApplicationRunner.java | 10 ++++++++ .../apache/samza/zk/ZkCoordinationUtils.java | 6 +++++ .../main/java/org/apache/samza/zk/ZkLock.java | 23 +++++++++++++------ .../samza/system/kafka/KafkaSystemAdmin.scala | 10 ++++++++ 7 files changed, 68 insertions(+), 7 deletions(-) diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index b18071243d..983c4648d9 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -114,4 +114,13 @@ default boolean createStream(StreamSpec streamSpec) { default void validateStream(StreamSpec streamSpec) throws StreamValidationException { throw new UnsupportedOperationException(); } + + /** + * Check if the stream described by the spec already exists. + * @param streamSpec The spec, or blueprint for the physical stream on the system. + * @return true if stream exists already, false otherwise + */ + default boolean existStream(StreamSpec streamSpec) { + return false; + } } diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java index 150b3d4fcc..32e4f6c4bd 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java @@ -41,4 +41,6 @@ public interface CoordinationUtils { LeaderElector getLeaderElector(); // leaderElector is unique based on the groupId Latch getLatch(int size, String latchId); + + Lock getLock(); } diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java index c6ab036228..c2eb78a7e6 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java @@ -75,4 +75,19 @@ Map getStreamPartitionCounts(String systemName, Set str return streamToPartitionCount; } + + /** + * Check if the streams described by the specs already exist. + * @param streams A list of stream specs, whose existence we need to check for + * @return true if all the streams exist already, false otherwise + */ + public boolean checkIfStreamsExist(List streams) { + for (StreamSpec spec: streams) { + SystemAdmin systemAdmin = sysAdmins.get(spec.getSystemName()); + if (!systemAdmin.existStream(spec)) { + return false; + } + } + return true; + } } diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 588e657f0d..77712975ff 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -39,6 +39,8 @@ import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.Latch; import org.apache.samza.coordinator.LeaderElector; +import org.apache.samza.coordinator.Lock; +import org.apache.samza.coordinator.LockListener; import org.apache.samza.execution.ExecutionPlan; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.processor.StreamProcessor; @@ -262,6 +264,14 @@ public void waitForFinish() { } } + + private void createStream(List intStreams) throws Exception { + boolean streamsExist = getStreamManager().checkIfStreamsExist(intStreams); + if (!streamsExist) { + getStreamManager().createStreams(intStreams); + } + } + /** * Create {@link StreamProcessor} based on {@link StreamApplication} and the config * @param config config diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java index f5dda2e880..aa366d7dea 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java @@ -23,6 +23,7 @@ import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.Latch; import org.apache.samza.coordinator.LeaderElector; +import org.apache.samza.coordinator.Lock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +61,11 @@ public Latch getLatch(int size, String latchId) { return new ZkProcessorLatch(size, latchId, processorIdStr, zkUtils); } + @Override + public Lock getLock() { + return new ZkLock(processorIdStr, zkUtils); + } + // TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer public ZkUtils getZkUtils() { return zkUtils; diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java index d34f7bf8fd..40567de457 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java @@ -29,20 +29,23 @@ import org.slf4j.LoggerFactory; +/** + * Lock primitive for Zookeeper. + */ public class ZkLock implements Lock { - public static final Logger LOG = LoggerFactory.getLogger(ZkLeaderElector.class); + public static final Logger LOG = LoggerFactory.getLogger(ZkLock.class); + private final static String LOCK_PATH = "lock"; private final ZkUtils zkUtils; private final String lockPath; private final String participantId; - private String currentSubscription = null; private final ZkKeyBuilder keyBuilder; private final IZkDataListener previousProcessorChangeListener; private final Random random = new Random(); + private String currentSubscription = null; private String nodePath = null; private LockListener zkLockListener = null; private AtomicBoolean hasLock; - private final static String LOCK_PATH = "lock"; public ZkLock(String participantId, ZkUtils zkUtils) { this.zkUtils = zkUtils; @@ -55,14 +58,17 @@ public ZkLock(String participantId, ZkUtils zkUtils) { } /** - * Create a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. + * Create a sequential ephemeral node to acquire the lock. + * If the path of this node has the lowest sequence number, the processor has acquired the lock. */ @Override public void lock() { try { nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); } catch (Exception e) { - zkLockListener.onError(); + if (zkLockListener != null) { + zkLockListener.onError(); + } } List children = zkUtils.getZkClient().getChildren(lockPath); int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); @@ -72,9 +78,12 @@ public void lock() { } if (index == 0) { + LOG.info("Acquired lock for participant id: {}", participantId); hasLock.set(true); if (zkLockListener != null) { zkLockListener.onAcquiringLock(); + } else { + throw new SamzaException("LockListener unassigned."); } } else { String predecessor = children.get(index - 1); @@ -109,13 +118,13 @@ public void lock() { /** - * Delete the node created to acquire the lock + * Delete the ephemeral sequential node created to acquire the lock. */ @Override public void unlock() { if (nodePath != null) { Boolean status = false; - while (!status && nodePath != null) { + while (!status) { status = zkUtils.getZkClient().delete(nodePath); } hasLock.set(false); diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 1e59b61e3c..76b668e819 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -520,6 +520,16 @@ class KafkaSystemAdmin( validateStream(new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions)) } + /** + * Checks for the existence of a stream defined by its spec, in Kafka. + * @param spec The spec, or blueprint from which the physical stream is created on the system. + */ + override def existStream(spec: StreamSpec): Boolean = { + val topicName = spec.getPhysicalName() + val zkClient = connectZk() + return AdminUtils.topicExists(zkClient, topicName) + } + /** * Compare the two offsets. Returns x where x < 0 if offset1 < offset2; * x == 0 if offset1 == offset2; x > 0 if offset1 > offset2. From 9750f2d532d38da353c1edfa53f2c4a573827ebc Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Wed, 9 Aug 2017 12:29:41 -0700 Subject: [PATCH 03/12] Fix retry --- .../src/main/java/org/apache/samza/coordinator/Lock.java | 2 +- .../java/org/apache/samza/runtime/LocalApplicationRunner.java | 1 - samza-core/src/main/java/org/apache/samza/zk/ZkLock.java | 4 +++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java b/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java index 27e8437775..344de9838e 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java @@ -24,7 +24,7 @@ public interface Lock { /** * Acquires the lock */ - void lock(); + boolean lock(); /** * Releases the lock diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 77712975ff..23210243c2 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -311,7 +311,6 @@ StreamProcessor createStreamProcessor( */ private boolean shouldContestInElectionForStreamCreation(Latch streamCreationLatch) { boolean eligibleForElection = true; - try { streamCreationLatch.await(LEADER_ELECTION_WAIT_TIME_MS, TimeUnit.MILLISECONDS); // case we didn't time out suggesting that latch already exists diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java index 40567de457..5b09286a54 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java @@ -62,7 +62,7 @@ public ZkLock(String participantId, ZkUtils zkUtils) { * If the path of this node has the lowest sequence number, the processor has acquired the lock. */ @Override - public void lock() { + public boolean lock() { try { nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); } catch (Exception e) { @@ -82,6 +82,7 @@ public void lock() { hasLock.set(true); if (zkLockListener != null) { zkLockListener.onAcquiringLock(); + return true; } else { throw new SamzaException("LockListener unassigned."); } @@ -113,6 +114,7 @@ public void lock() { LOG.info("Predecessor doesn't exist anymore. Trying to acquire lock again..."); lock(); } + return false; } } From 83a964480f55b84217269f674eb5223e07689e6e Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Wed, 9 Aug 2017 17:25:31 -0700 Subject: [PATCH 04/12] Add lock timeout --- .../samza/runtime/LocalApplicationRunner.java | 3 +- .../apache/samza/zk/ZkCoordinationUtils.java | 2 +- .../main/java/org/apache/samza/zk/ZkLock.java | 102 ++++++------------ 3 files changed, 36 insertions(+), 71 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 23210243c2..f94bf173d4 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -265,7 +265,7 @@ public void waitForFinish() { } - private void createStream(List intStreams) throws Exception { + private void createIntermediateStreams(List intStreams) throws Exception { boolean streamsExist = getStreamManager().checkIfStreamsExist(intStreams); if (!streamsExist) { getStreamManager().createStreams(intStreams); @@ -318,7 +318,6 @@ private boolean shouldContestInElectionForStreamCreation(Latch streamCreationLat } catch (TimeoutException e) { LOG.info("Timed out waiting for the latch! Going to enter leader election section to create streams"); } - return eligibleForElection; } } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java index aa366d7dea..5cfc3f31e9 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java @@ -63,7 +63,7 @@ public Latch getLatch(int size, String latchId) { @Override public Lock getLock() { - return new ZkLock(processorIdStr, zkUtils); + return new ZkLock(processorIdStr, zkUtils, zkConfig); } // TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java index 5b09286a54..2714955ff2 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java @@ -18,11 +18,15 @@ */ package org.apache.samza.zk; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.List; import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.I0Itec.zkclient.IZkDataListener; import org.apache.samza.SamzaException; +import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.Lock; import org.apache.samza.coordinator.LockListener; import org.slf4j.Logger; @@ -36,99 +40,74 @@ public class ZkLock implements Lock { public static final Logger LOG = LoggerFactory.getLogger(ZkLock.class); private final static String LOCK_PATH = "lock"; + private final ZkConfig zkConfig; private final ZkUtils zkUtils; private final String lockPath; private final String participantId; private final ZkKeyBuilder keyBuilder; - private final IZkDataListener previousProcessorChangeListener; private final Random random = new Random(); - private String currentSubscription = null; private String nodePath = null; private LockListener zkLockListener = null; private AtomicBoolean hasLock; - public ZkLock(String participantId, ZkUtils zkUtils) { + public ZkLock(String participantId, ZkUtils zkUtils, ZkConfig zkConfig) { + this.zkConfig = zkConfig; this.zkUtils = zkUtils; this.participantId = participantId; this.keyBuilder = this.zkUtils.getKeyBuilder(); - this.previousProcessorChangeListener = new PreviousLockProcessorChangeListener(); lockPath = String.format("%s/%s", keyBuilder.getRootPath(), LOCK_PATH); zkUtils.makeSurePersistentPathsExists(new String[] {lockPath}); this.hasLock = new AtomicBoolean(false); } /** - * Create a sequential ephemeral node to acquire the lock. - * If the path of this node has the lowest sequence number, the processor has acquired the lock. + * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out. + * Creates a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. */ @Override public boolean lock() { - try { - nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); - } catch (Exception e) { - if (zkLockListener != null) { - zkLockListener.onError(); - } - } - List children = zkUtils.getZkClient().getChildren(lockPath); - int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); + nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); - if (children.size() == 0 || index == -1) { - throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!"); - } - - if (index == 0) { - LOG.info("Acquired lock for participant id: {}", participantId); - hasLock.set(true); - if (zkLockListener != null) { - zkLockListener.onAcquiringLock(); - return true; - } else { - throw new SamzaException("LockListener unassigned."); - } - } else { - String predecessor = children.get(index - 1); - if (!predecessor.equals(currentSubscription)) { - if (currentSubscription != null) { - zkUtils.unsubscribeDataChanges(lockPath + "/" + currentSubscription, - previousProcessorChangeListener); + //Start timer for timeout + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("ZkLock-%d").build()); + scheduler.schedule(() -> { + if (!hasLock.get()) { + throw new SamzaException("Timed out while acquiring lock for creating intermediate streams."); } - currentSubscription = predecessor; - zkUtils.subscribeDataChanges(lockPath + "/" + currentSubscription, - previousProcessorChangeListener); + }, zkConfig.getZkSessionTimeoutMs(), TimeUnit.MILLISECONDS); + + while (!hasLock.get()) { + List children = zkUtils.getZkClient().getChildren(lockPath); + int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); + + if (children.size() == 0 || index == -1) { + throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!"); } - /** - * Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes - * on the path, even if the path doesn't exist. Since we are using Ephemeral Sequential nodes, if the path doesn't - * exist during subscription, it is not going to get created in the future. - */ - boolean predecessorExists = zkUtils.exists(lockPath + "/" + currentSubscription); - if (predecessorExists) { - LOG.info("Predecessor still exists. Current subscription is valid. Continuing as non-lockholder."); + // Acquires lock when the node has the lowest sequence number and returns. + if (index == 0) { + hasLock.set(true); + LOG.info("Acquired lock for participant id: {}", participantId); + return true; } else { + // Keep trying to acquire the lock try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { Thread.interrupted(); } - LOG.info("Predecessor doesn't exist anymore. Trying to acquire lock again..."); - lock(); + LOG.info("Trying to acquire lock again..."); } - return false; } + return hasLock.get(); } - /** - * Delete the ephemeral sequential node created to acquire the lock. + * Unlocks, by deleting the ephemeral sequential node created to acquire the lock. */ @Override public void unlock() { if (nodePath != null) { - Boolean status = false; - while (!status) { - status = zkUtils.getZkClient().delete(nodePath); - } + zkUtils.getZkClient().delete(nodePath); hasLock.set(false); nodePath = null; LOG.info("Ephemeral lock node deleted. Unlocked!"); @@ -147,17 +126,4 @@ public void setLockListener(LockListener listener) { this.zkLockListener = listener; } - class PreviousLockProcessorChangeListener implements IZkDataListener { - @Override - public void handleDataChange(String dataPath, Object data) throws Exception { - LOG.debug("Data change on path: " + dataPath + " Data: " + data); - } - - @Override - public void handleDataDeleted(String dataPath) throws Exception { - LOG.info("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again..."); - lock(); - } - } - } \ No newline at end of file From afbddac764d108026a09339250bf887647cc8d3d Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Fri, 11 Aug 2017 17:02:18 -0700 Subject: [PATCH 05/12] Change lock API --- .../org/apache/samza/system/SystemAdmin.java | 11 +--- .../samza/coordinator/CoordinationUtils.java | 2 +- .../{Lock.java => DistributedLock.java} | 17 +++--- .../samza/coordinator/LockListener.java | 30 ---------- .../apache/samza/execution/StreamManager.java | 17 +----- .../samza/runtime/LocalApplicationRunner.java | 13 +---- .../apache/samza/zk/ZkCoordinationUtils.java | 6 +- .../{ZkLock.java => ZkDistributedLock.java} | 57 ++++++------------- .../java/org/apache/samza/zk/TestZkLock.java | 23 -------- .../samza/system/kafka/KafkaSystemAdmin.scala | 10 ---- 10 files changed, 35 insertions(+), 151 deletions(-) rename samza-core/src/main/java/org/apache/samza/coordinator/{Lock.java => DistributedLock.java} (73%) delete mode 100644 samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java rename samza-core/src/main/java/org/apache/samza/zk/{ZkLock.java => ZkDistributedLock.java} (67%) delete mode 100644 samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index 983c4648d9..27eef5fb34 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -114,13 +114,4 @@ default boolean createStream(StreamSpec streamSpec) { default void validateStream(StreamSpec streamSpec) throws StreamValidationException { throw new UnsupportedOperationException(); } - - /** - * Check if the stream described by the spec already exists. - * @param streamSpec The spec, or blueprint for the physical stream on the system. - * @return true if stream exists already, false otherwise - */ - default boolean existStream(StreamSpec streamSpec) { - return false; - } -} +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java index 32e4f6c4bd..23afe4e92a 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java @@ -42,5 +42,5 @@ public interface CoordinationUtils { Latch getLatch(int size, String latchId); - Lock getLock(); + DistributedLock getLock(); } diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java similarity index 73% rename from samza-core/src/main/java/org/apache/samza/coordinator/Lock.java rename to samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java index 344de9838e..6972cd95ba 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java @@ -19,20 +19,21 @@ package org.apache.samza.coordinator; -public interface Lock { +import java.util.concurrent.TimeUnit; + + +public interface DistributedLock { /** - * Acquires the lock + * Tries to acquire the lock + * @param timeout Duration of lock acquiring timeout. + * @param unit Time Unit of the timeout defined above. + * @return true if lock is acquired successfully, false if it times out. */ - boolean lock(); + boolean lock(long timeout, TimeUnit unit); /** * Releases the lock */ void unlock(); - - boolean hasLock(); - - void setLockListener(LockListener listener); - } \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java b/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java deleted file mode 100644 index f80da793e6..0000000000 --- a/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.coordinator; - -public interface LockListener { - - public void onAcquiringLock(); - - /** - * Perform the necessary operation when a notification about acquire lock/release lock/error has been received. - */ - public void onError(); - -} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java index c2eb78a7e6..c290d31b2f 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java @@ -75,19 +75,4 @@ Map getStreamPartitionCounts(String systemName, Set str return streamToPartitionCount; } - - /** - * Check if the streams described by the specs already exist. - * @param streams A list of stream specs, whose existence we need to check for - * @return true if all the streams exist already, false otherwise - */ - public boolean checkIfStreamsExist(List streams) { - for (StreamSpec spec: streams) { - SystemAdmin systemAdmin = sysAdmins.get(spec.getSystemName()); - if (!systemAdmin.existStream(spec)) { - return false; - } - } - return true; - } -} +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index f94bf173d4..905e8c249e 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -37,10 +37,9 @@ import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.coordinator.CoordinationUtils; +import org.apache.samza.coordinator.DistributedLock; import org.apache.samza.coordinator.Latch; import org.apache.samza.coordinator.LeaderElector; -import org.apache.samza.coordinator.Lock; -import org.apache.samza.coordinator.LockListener; import org.apache.samza.execution.ExecutionPlan; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.processor.StreamProcessor; @@ -264,14 +263,6 @@ public void waitForFinish() { } } - - private void createIntermediateStreams(List intStreams) throws Exception { - boolean streamsExist = getStreamManager().checkIfStreamsExist(intStreams); - if (!streamsExist) { - getStreamManager().createStreams(intStreams); - } - } - /** * Create {@link StreamProcessor} based on {@link StreamApplication} and the config * @param config config @@ -320,4 +311,4 @@ private boolean shouldContestInElectionForStreamCreation(Latch streamCreationLat } return eligibleForElection; } -} +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java index 5cfc3f31e9..b84d2a4626 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java @@ -23,7 +23,7 @@ import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.Latch; import org.apache.samza.coordinator.LeaderElector; -import org.apache.samza.coordinator.Lock; +import org.apache.samza.coordinator.DistributedLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,8 +62,8 @@ public Latch getLatch(int size, String latchId) { } @Override - public Lock getLock() { - return new ZkLock(processorIdStr, zkUtils, zkConfig); + public DistributedLock getLock() { + return new ZkDistributedLock(processorIdStr, zkUtils, zkConfig); } // TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java similarity index 67% rename from samza-core/src/main/java/org/apache/samza/zk/ZkLock.java rename to samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java index 2714955ff2..4b7b138392 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java @@ -18,65 +18,54 @@ */ package org.apache.samza.zk; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.List; import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.samza.SamzaException; import org.apache.samza.config.ZkConfig; -import org.apache.samza.coordinator.Lock; -import org.apache.samza.coordinator.LockListener; +import org.apache.samza.coordinator.DistributedLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Lock primitive for Zookeeper. + * Distributed lock primitive for Zookeeper. */ -public class ZkLock implements Lock { +public class ZkDistributedLock implements DistributedLock { - public static final Logger LOG = LoggerFactory.getLogger(ZkLock.class); - private final static String LOCK_PATH = "lock"; - private final ZkConfig zkConfig; + public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedLock.class); + private final static String LOCK_PATH = "distributed_lock"; private final ZkUtils zkUtils; private final String lockPath; private final String participantId; private final ZkKeyBuilder keyBuilder; private final Random random = new Random(); private String nodePath = null; - private LockListener zkLockListener = null; - private AtomicBoolean hasLock; - public ZkLock(String participantId, ZkUtils zkUtils, ZkConfig zkConfig) { - this.zkConfig = zkConfig; + public ZkDistributedLock(String participantId, ZkUtils zkUtils, ZkConfig zkConfig) { this.zkUtils = zkUtils; this.participantId = participantId; this.keyBuilder = this.zkUtils.getKeyBuilder(); lockPath = String.format("%s/%s", keyBuilder.getRootPath(), LOCK_PATH); zkUtils.makeSurePersistentPathsExists(new String[] {lockPath}); - this.hasLock = new AtomicBoolean(false); } /** * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out. * Creates a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. + * @param timeout Duration of lock acquiring timeout. + * @param unit Unit of the timeout defined above. + * @return true if lock is acquired successfully, false if it times out. */ @Override - public boolean lock() { + public boolean lock(long timeout, TimeUnit unit) { nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); //Start timer for timeout - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("ZkLock-%d").build()); - scheduler.schedule(() -> { - if (!hasLock.get()) { - throw new SamzaException("Timed out while acquiring lock for creating intermediate streams."); - } - }, zkConfig.getZkSessionTimeoutMs(), TimeUnit.MILLISECONDS); + long startTime = System.currentTimeMillis(); + long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit); - while (!hasLock.get()) { + while (true) { List children = zkUtils.getZkClient().getChildren(lockPath); int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); @@ -85,11 +74,14 @@ public boolean lock() { } // Acquires lock when the node has the lowest sequence number and returns. if (index == 0) { - hasLock.set(true); LOG.info("Acquired lock for participant id: {}", participantId); return true; } else { // Keep trying to acquire the lock + long currentTime = System.currentTimeMillis(); + if ((currentTime - startTime) >= lockTimeout) { + return false; + } try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { @@ -98,7 +90,6 @@ public boolean lock() { LOG.info("Trying to acquire lock again..."); } } - return hasLock.get(); } /** @@ -108,22 +99,10 @@ public boolean lock() { public void unlock() { if (nodePath != null) { zkUtils.getZkClient().delete(nodePath); - hasLock.set(false); nodePath = null; LOG.info("Ephemeral lock node deleted. Unlocked!"); } else { - LOG.info("Ephemeral lock node doesn't exist"); + LOG.error("Ephemeral lock node you want to delete doesn't exist"); } } - - @Override - public boolean hasLock() { - return hasLock.get(); - } - - @Override - public void setLockListener(LockListener listener) { - this.zkLockListener = listener; - } - } \ No newline at end of file diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java deleted file mode 100644 index 50d55702a6..0000000000 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.zk; - -public class TestZkLock { -} diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 76b668e819..1e59b61e3c 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -520,16 +520,6 @@ class KafkaSystemAdmin( validateStream(new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions)) } - /** - * Checks for the existence of a stream defined by its spec, in Kafka. - * @param spec The spec, or blueprint from which the physical stream is created on the system. - */ - override def existStream(spec: StreamSpec): Boolean = { - val topicName = spec.getPhysicalName() - val zkClient = connectZk() - return AdminUtils.topicExists(zkClient, topicName) - } - /** * Compare the two offsets. Returns x where x < 0 if offset1 < offset2; * x == 0 if offset1 == offset2; x > 0 if offset1 > offset2. From 03f1bc7adb1878e53161213bbd5a84e6457f7de8 Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Fri, 11 Aug 2017 18:50:17 -0700 Subject: [PATCH 06/12] Address review --- .../org/apache/samza/system/SystemAdmin.java | 2 +- .../samza/coordinator/CoordinationUtils.java | 2 +- .../apache/samza/execution/StreamManager.java | 2 +- .../samza/runtime/LocalApplicationRunner.java | 63 +++++++------------ .../apache/samza/zk/ZkCoordinationUtils.java | 4 +- .../apache/samza/zk/ZkDistributedLock.java | 16 ++--- 6 files changed, 33 insertions(+), 56 deletions(-) diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index 27eef5fb34..b18071243d 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -114,4 +114,4 @@ default boolean createStream(StreamSpec streamSpec) { default void validateStream(StreamSpec streamSpec) throws StreamValidationException { throw new UnsupportedOperationException(); } -} \ No newline at end of file +} diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java index 23afe4e92a..1e4acd79d0 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java @@ -42,5 +42,5 @@ public interface CoordinationUtils { Latch getLatch(int size, String latchId); - DistributedLock getLock(); + DistributedLock getLock(String initLockPath); } diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java index c290d31b2f..c6ab036228 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java @@ -75,4 +75,4 @@ Map getStreamPartitionCounts(String systemName, Set str return streamToPartitionCount; } -} \ No newline at end of file +} diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 905e8c249e..cf104f0841 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -29,6 +30,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; @@ -60,9 +62,8 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class); private static final String APPLICATION_RUNNER_ZK_PATH_SUFFIX = "/ApplicationRunnerData"; - // Latch timeout is set to 10 min - private static final long LATCH_TIMEOUT_MINUTES = 10; - private static final long LEADER_ELECTION_WAIT_TIME_MS = 1000; + + private static final long LOCK_TIMEOUT_SECONDS = 10; private final String uid; private final Set processors = ConcurrentHashMap.newKeySet(); @@ -224,6 +225,18 @@ public void waitForFinish() { } } + /** + * Generates a unique lock ID which is consistent for all processors within the same application lifecycle. + * Each {@code StreamSpec} has an ID that is unique and is used for hashcode computation. + * @param intStreams list of {@link StreamSpec}s + * @return lock ID + */ + private String generateLockId(List intStreams) { + return String.valueOf(Objects.hashCode(intStreams.stream() + .map(StreamSpec::getId) + .collect(Collectors.toList()))); + } + /** * Create intermediate streams using {@link org.apache.samza.execution.StreamManager}. * If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader @@ -231,7 +244,7 @@ public void waitForFinish() { * stream creation. * @param planId a unique identifier representing the plan used for coordination purpose * @param intStreams list of intermediate {@link StreamSpec}s - * @throws TimeoutException exception for latch timeout + * @throws TimeoutException exception for lock timeout */ /* package private */ void createStreams(String planId, List intStreams) throws TimeoutException { if (!intStreams.isEmpty()) { @@ -239,18 +252,13 @@ public void waitForFinish() { // Refer SAMZA-1385 for more details CoordinationUtils coordinationUtils = createCoordinationUtils(); if (coordinationUtils != null) { - Latch initLatch = coordinationUtils.getLatch(1, planId); - + DistributedLock initLock = coordinationUtils.getLock(generateLockId(intStreams)); try { - // check if the processor needs to go through leader election and stream creation - if (shouldContestInElectionForStreamCreation(initLatch)) { - LeaderElector leaderElector = coordinationUtils.getLeaderElector(); - leaderElector.setLeaderElectorListener(() -> { - getStreamManager().createStreams(intStreams); - initLatch.countDown(); - }); - leaderElector.tryBecomeLeader(); - initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES); + boolean hasLock = initLock.lock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (hasLock) { + getStreamManager().createStreams(intStreams); + } else { + LOG.error("Timed out while trying to acquire lock.", new TimeoutException()); } } finally { coordinationUtils.reset(); @@ -286,29 +294,4 @@ StreamProcessor createStreamProcessor( taskFactory.getClass().getCanonicalName())); } } - - /** - * In order to fix SAMZA-1385, we are limiting the scope of coordination util within stream creation phase and destroying - * the coordination util right after. By closing the zk connection, we clean up the ephemeral node used for leader election. - * It creates the following issues whenever a new process joins after the ephemeral node is gone. - * 1. It is unnecessary to re-conduct leader election for stream creation in the same application lifecycle - * 2. Underlying systems may not support check for stream existence prior to creation which could have potential problems. - * As in interim solution, we reuse the same latch as a marker to determine if create streams phase is done for the - * application lifecycle using {@link Latch#await(long, TimeUnit)} - * - * @param streamCreationLatch latch used for stream creation - * @return true if processor needs to be part of election - * false otherwise - */ - private boolean shouldContestInElectionForStreamCreation(Latch streamCreationLatch) { - boolean eligibleForElection = true; - try { - streamCreationLatch.await(LEADER_ELECTION_WAIT_TIME_MS, TimeUnit.MILLISECONDS); - // case we didn't time out suggesting that latch already exists - eligibleForElection = false; - } catch (TimeoutException e) { - LOG.info("Timed out waiting for the latch! Going to enter leader election section to create streams"); - } - return eligibleForElection; - } } \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java index b84d2a4626..3fd7b75a60 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java @@ -62,8 +62,8 @@ public Latch getLatch(int size, String latchId) { } @Override - public DistributedLock getLock() { - return new ZkDistributedLock(processorIdStr, zkUtils, zkConfig); + public DistributedLock getLock(String initLockPath) { + return new ZkDistributedLock(processorIdStr, zkUtils, initLockPath); } // TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java index 4b7b138392..71723739c6 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java @@ -22,7 +22,6 @@ import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.samza.SamzaException; -import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.DistributedLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +33,6 @@ public class ZkDistributedLock implements DistributedLock { public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedLock.class); - private final static String LOCK_PATH = "distributed_lock"; private final ZkUtils zkUtils; private final String lockPath; private final String participantId; @@ -42,11 +40,11 @@ public class ZkDistributedLock implements DistributedLock { private final Random random = new Random(); private String nodePath = null; - public ZkDistributedLock(String participantId, ZkUtils zkUtils, ZkConfig zkConfig) { + public ZkDistributedLock(String participantId, ZkUtils zkUtils, String initLockPath) { this.zkUtils = zkUtils; this.participantId = participantId; this.keyBuilder = this.zkUtils.getKeyBuilder(); - lockPath = String.format("%s/%s", keyBuilder.getRootPath(), LOCK_PATH); + lockPath = String.format("%s/%s", keyBuilder.getRootPath(), initLockPath); zkUtils.makeSurePersistentPathsExists(new String[] {lockPath}); } @@ -65,7 +63,7 @@ public boolean lock(long timeout, TimeUnit unit) { long startTime = System.currentTimeMillis(); long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit); - while (true) { + while ((System.currentTimeMillis() - startTime) < lockTimeout) { List children = zkUtils.getZkClient().getChildren(lockPath); int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); @@ -77,11 +75,6 @@ public boolean lock(long timeout, TimeUnit unit) { LOG.info("Acquired lock for participant id: {}", participantId); return true; } else { - // Keep trying to acquire the lock - long currentTime = System.currentTimeMillis(); - if ((currentTime - startTime) >= lockTimeout) { - return false; - } try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { @@ -90,6 +83,7 @@ public boolean lock(long timeout, TimeUnit unit) { LOG.info("Trying to acquire lock again..."); } } + return false; } /** @@ -102,7 +96,7 @@ public void unlock() { nodePath = null; LOG.info("Ephemeral lock node deleted. Unlocked!"); } else { - LOG.error("Ephemeral lock node you want to delete doesn't exist"); + LOG.warn("Ephemeral lock node you want to delete doesn't exist"); } } } \ No newline at end of file From 774903ab7eb5501d5537d90c9d47e025a5146577 Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Tue, 8 Aug 2017 10:33:26 -0700 Subject: [PATCH 07/12] Add lock --- .../org/apache/samza/coordinator/Lock.java | 38 +++++ .../samza/coordinator/LockListener.java | 30 ++++ .../main/java/org/apache/samza/zk/ZkLock.java | 152 ++++++++++++++++++ .../java/org/apache/samza/zk/TestZkLock.java | 23 +++ 4 files changed, 243 insertions(+) create mode 100644 samza-core/src/main/java/org/apache/samza/coordinator/Lock.java create mode 100644 samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java create mode 100644 samza-core/src/main/java/org/apache/samza/zk/ZkLock.java create mode 100644 samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java b/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java new file mode 100644 index 0000000000..27e8437775 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator; + +public interface Lock { + + /** + * Acquires the lock + */ + void lock(); + + /** + * Releases the lock + */ + void unlock(); + + boolean hasLock(); + + void setLockListener(LockListener listener); + +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java b/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java new file mode 100644 index 0000000000..f80da793e6 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +public interface LockListener { + + public void onAcquiringLock(); + + /** + * Perform the necessary operation when a notification about acquire lock/release lock/error has been received. + */ + public void onError(); + +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java new file mode 100644 index 0000000000..d34f7bf8fd --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import org.I0Itec.zkclient.IZkDataListener; +import org.apache.samza.SamzaException; +import org.apache.samza.coordinator.Lock; +import org.apache.samza.coordinator.LockListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ZkLock implements Lock { + + public static final Logger LOG = LoggerFactory.getLogger(ZkLeaderElector.class); + private final ZkUtils zkUtils; + private final String lockPath; + private final String participantId; + private String currentSubscription = null; + private final ZkKeyBuilder keyBuilder; + private final IZkDataListener previousProcessorChangeListener; + private final Random random = new Random(); + private String nodePath = null; + private LockListener zkLockListener = null; + private AtomicBoolean hasLock; + private final static String LOCK_PATH = "lock"; + + public ZkLock(String participantId, ZkUtils zkUtils) { + this.zkUtils = zkUtils; + this.participantId = participantId; + this.keyBuilder = this.zkUtils.getKeyBuilder(); + this.previousProcessorChangeListener = new PreviousLockProcessorChangeListener(); + lockPath = String.format("%s/%s", keyBuilder.getRootPath(), LOCK_PATH); + zkUtils.makeSurePersistentPathsExists(new String[] {lockPath}); + this.hasLock = new AtomicBoolean(false); + } + + /** + * Create a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. + */ + @Override + public void lock() { + try { + nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); + } catch (Exception e) { + zkLockListener.onError(); + } + List children = zkUtils.getZkClient().getChildren(lockPath); + int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); + + if (children.size() == 0 || index == -1) { + throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!"); + } + + if (index == 0) { + hasLock.set(true); + if (zkLockListener != null) { + zkLockListener.onAcquiringLock(); + } + } else { + String predecessor = children.get(index - 1); + if (!predecessor.equals(currentSubscription)) { + if (currentSubscription != null) { + zkUtils.unsubscribeDataChanges(lockPath + "/" + currentSubscription, + previousProcessorChangeListener); + } + currentSubscription = predecessor; + zkUtils.subscribeDataChanges(lockPath + "/" + currentSubscription, + previousProcessorChangeListener); + } + /** + * Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes + * on the path, even if the path doesn't exist. Since we are using Ephemeral Sequential nodes, if the path doesn't + * exist during subscription, it is not going to get created in the future. + */ + boolean predecessorExists = zkUtils.exists(lockPath + "/" + currentSubscription); + if (predecessorExists) { + LOG.info("Predecessor still exists. Current subscription is valid. Continuing as non-lockholder."); + } else { + try { + Thread.sleep(random.nextInt(1000)); + } catch (InterruptedException e) { + Thread.interrupted(); + } + LOG.info("Predecessor doesn't exist anymore. Trying to acquire lock again..."); + lock(); + } + } + } + + + /** + * Delete the node created to acquire the lock + */ + @Override + public void unlock() { + if (nodePath != null) { + Boolean status = false; + while (!status && nodePath != null) { + status = zkUtils.getZkClient().delete(nodePath); + } + hasLock.set(false); + nodePath = null; + LOG.info("Ephemeral lock node deleted. Unlocked!"); + } else { + LOG.info("Ephemeral lock node doesn't exist"); + } + } + + @Override + public boolean hasLock() { + return hasLock.get(); + } + + @Override + public void setLockListener(LockListener listener) { + this.zkLockListener = listener; + } + + class PreviousLockProcessorChangeListener implements IZkDataListener { + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + LOG.debug("Data change on path: " + dataPath + " Data: " + data); + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + LOG.info("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again..."); + lock(); + } + } + +} \ No newline at end of file diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java new file mode 100644 index 0000000000..50d55702a6 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.zk; + +public class TestZkLock { +} From 80503f8ade5fc457d17eb95580a81c3a66ecfd10 Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Wed, 9 Aug 2017 11:54:08 -0700 Subject: [PATCH 08/12] Add create streams with lock --- .../org/apache/samza/system/SystemAdmin.java | 9 ++++++++ .../apache/samza/execution/StreamManager.java | 15 ++++++++++++ .../samza/runtime/LocalApplicationRunner.java | 6 ++--- .../main/java/org/apache/samza/zk/ZkLock.java | 23 +++++++++++++------ .../samza/system/kafka/KafkaSystemAdmin.scala | 10 ++++++++ 5 files changed, 52 insertions(+), 11 deletions(-) diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index b18071243d..983c4648d9 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -114,4 +114,13 @@ default boolean createStream(StreamSpec streamSpec) { default void validateStream(StreamSpec streamSpec) throws StreamValidationException { throw new UnsupportedOperationException(); } + + /** + * Check if the stream described by the spec already exists. + * @param streamSpec The spec, or blueprint for the physical stream on the system. + * @return true if stream exists already, false otherwise + */ + default boolean existStream(StreamSpec streamSpec) { + return false; + } } diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java index c6ab036228..c2eb78a7e6 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java @@ -75,4 +75,19 @@ Map getStreamPartitionCounts(String systemName, Set str return streamToPartitionCount; } + + /** + * Check if the streams described by the specs already exist. + * @param streams A list of stream specs, whose existence we need to check for + * @return true if all the streams exist already, false otherwise + */ + public boolean checkIfStreamsExist(List streams) { + for (StreamSpec spec: streams) { + SystemAdmin systemAdmin = sysAdmins.get(spec.getSystemName()); + if (!systemAdmin.existStream(spec)) { + return false; + } + } + return true; + } } diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index cf104f0841..0339d139fe 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -40,8 +40,7 @@ import org.apache.samza.config.TaskConfig; import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.DistributedLock; -import org.apache.samza.coordinator.Latch; -import org.apache.samza.coordinator.LeaderElector; +import org.apache.samza.coordinator.Lock; import org.apache.samza.execution.ExecutionPlan; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.processor.StreamProcessor; @@ -62,9 +61,8 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class); private static final String APPLICATION_RUNNER_ZK_PATH_SUFFIX = "/ApplicationRunnerData"; - + // Lock timeout is set to 10 seconds here, as we don't want to introduce a new config value currently. private static final long LOCK_TIMEOUT_SECONDS = 10; - private final String uid; private final Set processors = ConcurrentHashMap.newKeySet(); private final CountDownLatch shutdownLatch = new CountDownLatch(1); diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java index d34f7bf8fd..40567de457 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java @@ -29,20 +29,23 @@ import org.slf4j.LoggerFactory; +/** + * Lock primitive for Zookeeper. + */ public class ZkLock implements Lock { - public static final Logger LOG = LoggerFactory.getLogger(ZkLeaderElector.class); + public static final Logger LOG = LoggerFactory.getLogger(ZkLock.class); + private final static String LOCK_PATH = "lock"; private final ZkUtils zkUtils; private final String lockPath; private final String participantId; - private String currentSubscription = null; private final ZkKeyBuilder keyBuilder; private final IZkDataListener previousProcessorChangeListener; private final Random random = new Random(); + private String currentSubscription = null; private String nodePath = null; private LockListener zkLockListener = null; private AtomicBoolean hasLock; - private final static String LOCK_PATH = "lock"; public ZkLock(String participantId, ZkUtils zkUtils) { this.zkUtils = zkUtils; @@ -55,14 +58,17 @@ public ZkLock(String participantId, ZkUtils zkUtils) { } /** - * Create a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. + * Create a sequential ephemeral node to acquire the lock. + * If the path of this node has the lowest sequence number, the processor has acquired the lock. */ @Override public void lock() { try { nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); } catch (Exception e) { - zkLockListener.onError(); + if (zkLockListener != null) { + zkLockListener.onError(); + } } List children = zkUtils.getZkClient().getChildren(lockPath); int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); @@ -72,9 +78,12 @@ public void lock() { } if (index == 0) { + LOG.info("Acquired lock for participant id: {}", participantId); hasLock.set(true); if (zkLockListener != null) { zkLockListener.onAcquiringLock(); + } else { + throw new SamzaException("LockListener unassigned."); } } else { String predecessor = children.get(index - 1); @@ -109,13 +118,13 @@ public void lock() { /** - * Delete the node created to acquire the lock + * Delete the ephemeral sequential node created to acquire the lock. */ @Override public void unlock() { if (nodePath != null) { Boolean status = false; - while (!status && nodePath != null) { + while (!status) { status = zkUtils.getZkClient().delete(nodePath); } hasLock.set(false); diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 1e59b61e3c..76b668e819 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -520,6 +520,16 @@ class KafkaSystemAdmin( validateStream(new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions)) } + /** + * Checks for the existence of a stream defined by its spec, in Kafka. + * @param spec The spec, or blueprint from which the physical stream is created on the system. + */ + override def existStream(spec: StreamSpec): Boolean = { + val topicName = spec.getPhysicalName() + val zkClient = connectZk() + return AdminUtils.topicExists(zkClient, topicName) + } + /** * Compare the two offsets. Returns x where x < 0 if offset1 < offset2; * x == 0 if offset1 == offset2; x > 0 if offset1 > offset2. From 23849c85429844b7a6c03c4f74c8ecab4bf22c19 Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Wed, 9 Aug 2017 12:29:41 -0700 Subject: [PATCH 09/12] Fix retry --- .../src/main/java/org/apache/samza/coordinator/Lock.java | 2 +- .../java/org/apache/samza/runtime/LocalApplicationRunner.java | 1 - samza-core/src/main/java/org/apache/samza/zk/ZkLock.java | 4 +++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java b/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java index 27e8437775..344de9838e 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java @@ -24,7 +24,7 @@ public interface Lock { /** * Acquires the lock */ - void lock(); + boolean lock(); /** * Releases the lock diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 0339d139fe..711922ae09 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -40,7 +40,6 @@ import org.apache.samza.config.TaskConfig; import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.DistributedLock; -import org.apache.samza.coordinator.Lock; import org.apache.samza.execution.ExecutionPlan; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.processor.StreamProcessor; diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java index 40567de457..5b09286a54 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java @@ -62,7 +62,7 @@ public ZkLock(String participantId, ZkUtils zkUtils) { * If the path of this node has the lowest sequence number, the processor has acquired the lock. */ @Override - public void lock() { + public boolean lock() { try { nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); } catch (Exception e) { @@ -82,6 +82,7 @@ public void lock() { hasLock.set(true); if (zkLockListener != null) { zkLockListener.onAcquiringLock(); + return true; } else { throw new SamzaException("LockListener unassigned."); } @@ -113,6 +114,7 @@ public void lock() { LOG.info("Predecessor doesn't exist anymore. Trying to acquire lock again..."); lock(); } + return false; } } From 5cf9d37d2f0a0dc89004ea00405eb3b3abd488f7 Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Wed, 9 Aug 2017 17:25:31 -0700 Subject: [PATCH 10/12] Add lock timeout --- .../main/java/org/apache/samza/zk/ZkLock.java | 102 ++++++------------ 1 file changed, 34 insertions(+), 68 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java index 5b09286a54..2714955ff2 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java @@ -18,11 +18,15 @@ */ package org.apache.samza.zk; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.List; import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.I0Itec.zkclient.IZkDataListener; import org.apache.samza.SamzaException; +import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.Lock; import org.apache.samza.coordinator.LockListener; import org.slf4j.Logger; @@ -36,99 +40,74 @@ public class ZkLock implements Lock { public static final Logger LOG = LoggerFactory.getLogger(ZkLock.class); private final static String LOCK_PATH = "lock"; + private final ZkConfig zkConfig; private final ZkUtils zkUtils; private final String lockPath; private final String participantId; private final ZkKeyBuilder keyBuilder; - private final IZkDataListener previousProcessorChangeListener; private final Random random = new Random(); - private String currentSubscription = null; private String nodePath = null; private LockListener zkLockListener = null; private AtomicBoolean hasLock; - public ZkLock(String participantId, ZkUtils zkUtils) { + public ZkLock(String participantId, ZkUtils zkUtils, ZkConfig zkConfig) { + this.zkConfig = zkConfig; this.zkUtils = zkUtils; this.participantId = participantId; this.keyBuilder = this.zkUtils.getKeyBuilder(); - this.previousProcessorChangeListener = new PreviousLockProcessorChangeListener(); lockPath = String.format("%s/%s", keyBuilder.getRootPath(), LOCK_PATH); zkUtils.makeSurePersistentPathsExists(new String[] {lockPath}); this.hasLock = new AtomicBoolean(false); } /** - * Create a sequential ephemeral node to acquire the lock. - * If the path of this node has the lowest sequence number, the processor has acquired the lock. + * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out. + * Creates a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. */ @Override public boolean lock() { - try { - nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); - } catch (Exception e) { - if (zkLockListener != null) { - zkLockListener.onError(); - } - } - List children = zkUtils.getZkClient().getChildren(lockPath); - int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); + nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); - if (children.size() == 0 || index == -1) { - throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!"); - } - - if (index == 0) { - LOG.info("Acquired lock for participant id: {}", participantId); - hasLock.set(true); - if (zkLockListener != null) { - zkLockListener.onAcquiringLock(); - return true; - } else { - throw new SamzaException("LockListener unassigned."); - } - } else { - String predecessor = children.get(index - 1); - if (!predecessor.equals(currentSubscription)) { - if (currentSubscription != null) { - zkUtils.unsubscribeDataChanges(lockPath + "/" + currentSubscription, - previousProcessorChangeListener); + //Start timer for timeout + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("ZkLock-%d").build()); + scheduler.schedule(() -> { + if (!hasLock.get()) { + throw new SamzaException("Timed out while acquiring lock for creating intermediate streams."); } - currentSubscription = predecessor; - zkUtils.subscribeDataChanges(lockPath + "/" + currentSubscription, - previousProcessorChangeListener); + }, zkConfig.getZkSessionTimeoutMs(), TimeUnit.MILLISECONDS); + + while (!hasLock.get()) { + List children = zkUtils.getZkClient().getChildren(lockPath); + int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); + + if (children.size() == 0 || index == -1) { + throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!"); } - /** - * Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes - * on the path, even if the path doesn't exist. Since we are using Ephemeral Sequential nodes, if the path doesn't - * exist during subscription, it is not going to get created in the future. - */ - boolean predecessorExists = zkUtils.exists(lockPath + "/" + currentSubscription); - if (predecessorExists) { - LOG.info("Predecessor still exists. Current subscription is valid. Continuing as non-lockholder."); + // Acquires lock when the node has the lowest sequence number and returns. + if (index == 0) { + hasLock.set(true); + LOG.info("Acquired lock for participant id: {}", participantId); + return true; } else { + // Keep trying to acquire the lock try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { Thread.interrupted(); } - LOG.info("Predecessor doesn't exist anymore. Trying to acquire lock again..."); - lock(); + LOG.info("Trying to acquire lock again..."); } - return false; } + return hasLock.get(); } - /** - * Delete the ephemeral sequential node created to acquire the lock. + * Unlocks, by deleting the ephemeral sequential node created to acquire the lock. */ @Override public void unlock() { if (nodePath != null) { - Boolean status = false; - while (!status) { - status = zkUtils.getZkClient().delete(nodePath); - } + zkUtils.getZkClient().delete(nodePath); hasLock.set(false); nodePath = null; LOG.info("Ephemeral lock node deleted. Unlocked!"); @@ -147,17 +126,4 @@ public void setLockListener(LockListener listener) { this.zkLockListener = listener; } - class PreviousLockProcessorChangeListener implements IZkDataListener { - @Override - public void handleDataChange(String dataPath, Object data) throws Exception { - LOG.debug("Data change on path: " + dataPath + " Data: " + data); - } - - @Override - public void handleDataDeleted(String dataPath) throws Exception { - LOG.info("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again..."); - lock(); - } - } - } \ No newline at end of file From 6a790b291fc03d80877bd37280800344c52c6e39 Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Wed, 16 Aug 2017 14:31:19 -0700 Subject: [PATCH 11/12] Removing existStreams --- .../org/apache/samza/system/SystemAdmin.java | 9 ------ .../samza/coordinator/LockListener.java | 30 ------------------- .../apache/samza/execution/StreamManager.java | 15 ---------- .../java/org/apache/samza/zk/TestZkLock.java | 23 -------------- .../samza/system/kafka/KafkaSystemAdmin.scala | 12 +------- 5 files changed, 1 insertion(+), 88 deletions(-) delete mode 100644 samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java delete mode 100644 samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index 983c4648d9..b18071243d 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -114,13 +114,4 @@ default boolean createStream(StreamSpec streamSpec) { default void validateStream(StreamSpec streamSpec) throws StreamValidationException { throw new UnsupportedOperationException(); } - - /** - * Check if the stream described by the spec already exists. - * @param streamSpec The spec, or blueprint for the physical stream on the system. - * @return true if stream exists already, false otherwise - */ - default boolean existStream(StreamSpec streamSpec) { - return false; - } } diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java b/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java deleted file mode 100644 index f80da793e6..0000000000 --- a/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.coordinator; - -public interface LockListener { - - public void onAcquiringLock(); - - /** - * Perform the necessary operation when a notification about acquire lock/release lock/error has been received. - */ - public void onError(); - -} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java index c2eb78a7e6..c6ab036228 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java @@ -75,19 +75,4 @@ Map getStreamPartitionCounts(String systemName, Set str return streamToPartitionCount; } - - /** - * Check if the streams described by the specs already exist. - * @param streams A list of stream specs, whose existence we need to check for - * @return true if all the streams exist already, false otherwise - */ - public boolean checkIfStreamsExist(List streams) { - for (StreamSpec spec: streams) { - SystemAdmin systemAdmin = sysAdmins.get(spec.getSystemName()); - if (!systemAdmin.existStream(spec)) { - return false; - } - } - return true; - } } diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java deleted file mode 100644 index 50d55702a6..0000000000 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.zk; - -public class TestZkLock { -} diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 76b668e819..b8d204b1cf 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -518,17 +518,7 @@ class KafkaSystemAdmin( */ override def validateChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = { validateStream(new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions)) - } - - /** - * Checks for the existence of a stream defined by its spec, in Kafka. - * @param spec The spec, or blueprint from which the physical stream is created on the system. - */ - override def existStream(spec: StreamSpec): Boolean = { - val topicName = spec.getPhysicalName() - val zkClient = connectZk() - return AdminUtils.topicExists(zkClient, topicName) - } + } /** * Compare the two offsets. Returns x where x < 0 if offset1 < offset2; From ad94c912965b46e4e98d2f8bbe72ddc8be4ff768 Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Wed, 16 Aug 2017 15:37:28 -0700 Subject: [PATCH 12/12] Rebasing with master --- .../org/apache/samza/coordinator/Lock.java | 38 ------- .../samza/runtime/LocalApplicationRunner.java | 37 +++---- .../apache/samza/zk/ZkCoordinationUtils.java | 2 +- .../apache/samza/zk/ZkDistributedLock.java | 102 ------------------ .../main/java/org/apache/samza/zk/ZkLock.java | 55 +++------- .../runtime/TestLocalApplicationRunner.java | 48 ++------- 6 files changed, 37 insertions(+), 245 deletions(-) delete mode 100644 samza-core/src/main/java/org/apache/samza/coordinator/Lock.java delete mode 100644 samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java b/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java deleted file mode 100644 index 344de9838e..0000000000 --- a/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.coordinator; - -public interface Lock { - - /** - * Acquires the lock - */ - boolean lock(); - - /** - * Releases the lock - */ - void unlock(); - - boolean hasLock(); - - void setLockListener(LockListener listener); - -} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 711922ae09..71b76bd37d 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -21,7 +21,6 @@ import java.util.HashMap; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -30,7 +29,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; @@ -38,6 +36,7 @@ import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.TaskConfig; +import org.apache.samza.coordinator.CoordinationServiceFactory; import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.DistributedLock; import org.apache.samza.execution.ExecutionPlan; @@ -48,8 +47,7 @@ import org.apache.samza.task.AsyncStreamTaskFactory; import org.apache.samza.task.StreamTaskFactory; import org.apache.samza.task.TaskFactoryUtil; -import org.apache.samza.zk.ZkCoordinationServiceFactory; -import org.apache.samza.zk.ZkJobCoordinatorFactory; +import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +60,9 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { private static final String APPLICATION_RUNNER_ZK_PATH_SUFFIX = "/ApplicationRunnerData"; // Lock timeout is set to 10 seconds here, as we don't want to introduce a new config value currently. private static final long LOCK_TIMEOUT_SECONDS = 10; + private static final String ZK_COORDINATION_CLASS = "org.apache.samza.zk.ZkJobCoordinatorFactory"; + private static final String ZK_COORDINATION_SERVICE_CLASS = "org.apache.samza.zk.ZkCoordinationServiceFactory"; + private final String uid; private final Set processors = ConcurrentHashMap.newKeySet(); private final CountDownLatch shutdownLatch = new CountDownLatch(1); @@ -213,47 +214,37 @@ public void waitForFinish() { String jobCoordinatorFactoryClassName = config.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ""); // TODO: we will need a better way to package the configs with application runner - if (ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) { + if (ZK_COORDINATION_CLASS.equals(jobCoordinatorFactoryClassName)) { ApplicationConfig appConfig = new ApplicationConfig(config); - return new ZkCoordinationServiceFactory().getCoordinationService( + return Util.getObj(ZK_COORDINATION_SERVICE_CLASS).getCoordinationService( appConfig.getGlobalAppId() + APPLICATION_RUNNER_ZK_PATH_SUFFIX, uid, config); } else { return null; } } - /** - * Generates a unique lock ID which is consistent for all processors within the same application lifecycle. - * Each {@code StreamSpec} has an ID that is unique and is used for hashcode computation. - * @param intStreams list of {@link StreamSpec}s - * @return lock ID - */ - private String generateLockId(List intStreams) { - return String.valueOf(Objects.hashCode(intStreams.stream() - .map(StreamSpec::getId) - .collect(Collectors.toList()))); - } - /** * Create intermediate streams using {@link org.apache.samza.execution.StreamManager}. - * If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader - * will create the streams. All the runner processes will wait on the latch that is released after the leader finishes - * stream creation. + * If {@link CoordinationUtils} is provided, this function will first acquire a lock, and then create the streams. + * All the runner processes will either wait till the time they acquire the lock, or timeout after the specified time. + * After stream creation, they will unlock and proceed normally. * @param planId a unique identifier representing the plan used for coordination purpose * @param intStreams list of intermediate {@link StreamSpec}s * @throws TimeoutException exception for lock timeout */ - /* package private */ void createStreams(String planId, List intStreams) throws TimeoutException { + /* package private */ void createStreams(String planId, List intStreams) throws TimeoutException { if (!intStreams.isEmpty()) { // Move the scope of coordination utils within stream creation to address long idle connection problem. // Refer SAMZA-1385 for more details CoordinationUtils coordinationUtils = createCoordinationUtils(); if (coordinationUtils != null) { - DistributedLock initLock = coordinationUtils.getLock(generateLockId(intStreams)); + DistributedLock initLock = coordinationUtils.getLock(planId); try { boolean hasLock = initLock.lock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS); if (hasLock) { getStreamManager().createStreams(intStreams); + LOG.info("Created intermediate streams. Unlocking now..."); + initLock.unlock(); } else { LOG.error("Timed out while trying to acquire lock.", new TimeoutException()); } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java index 3fd7b75a60..4b50d80b34 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java @@ -63,7 +63,7 @@ public Latch getLatch(int size, String latchId) { @Override public DistributedLock getLock(String initLockPath) { - return new ZkDistributedLock(processorIdStr, zkUtils, initLockPath); + return new ZkLock(processorIdStr, zkUtils, initLockPath); } // TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java deleted file mode 100644 index 71723739c6..0000000000 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.zk; - -import java.util.List; -import java.util.Random; -import java.util.concurrent.TimeUnit; -import org.apache.samza.SamzaException; -import org.apache.samza.coordinator.DistributedLock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Distributed lock primitive for Zookeeper. - */ -public class ZkDistributedLock implements DistributedLock { - - public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedLock.class); - private final ZkUtils zkUtils; - private final String lockPath; - private final String participantId; - private final ZkKeyBuilder keyBuilder; - private final Random random = new Random(); - private String nodePath = null; - - public ZkDistributedLock(String participantId, ZkUtils zkUtils, String initLockPath) { - this.zkUtils = zkUtils; - this.participantId = participantId; - this.keyBuilder = this.zkUtils.getKeyBuilder(); - lockPath = String.format("%s/%s", keyBuilder.getRootPath(), initLockPath); - zkUtils.makeSurePersistentPathsExists(new String[] {lockPath}); - } - - /** - * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out. - * Creates a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. - * @param timeout Duration of lock acquiring timeout. - * @param unit Unit of the timeout defined above. - * @return true if lock is acquired successfully, false if it times out. - */ - @Override - public boolean lock(long timeout, TimeUnit unit) { - nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); - - //Start timer for timeout - long startTime = System.currentTimeMillis(); - long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit); - - while ((System.currentTimeMillis() - startTime) < lockTimeout) { - List children = zkUtils.getZkClient().getChildren(lockPath); - int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); - - if (children.size() == 0 || index == -1) { - throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!"); - } - // Acquires lock when the node has the lowest sequence number and returns. - if (index == 0) { - LOG.info("Acquired lock for participant id: {}", participantId); - return true; - } else { - try { - Thread.sleep(random.nextInt(1000)); - } catch (InterruptedException e) { - Thread.interrupted(); - } - LOG.info("Trying to acquire lock again..."); - } - } - return false; - } - - /** - * Unlocks, by deleting the ephemeral sequential node created to acquire the lock. - */ - @Override - public void unlock() { - if (nodePath != null) { - zkUtils.getZkClient().delete(nodePath); - nodePath = null; - LOG.info("Ephemeral lock node deleted. Unlocked!"); - } else { - LOG.warn("Ephemeral lock node you want to delete doesn't exist"); - } - } -} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java index 2714955ff2..b8ceda490d 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java @@ -18,65 +18,52 @@ */ package org.apache.samza.zk; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.List; import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.samza.SamzaException; -import org.apache.samza.config.ZkConfig; -import org.apache.samza.coordinator.Lock; -import org.apache.samza.coordinator.LockListener; +import org.apache.samza.coordinator.DistributedLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Lock primitive for Zookeeper. + * Distributed lock primitive for Zookeeper. */ -public class ZkLock implements Lock { +public class ZkLock implements DistributedLock { public static final Logger LOG = LoggerFactory.getLogger(ZkLock.class); - private final static String LOCK_PATH = "lock"; - private final ZkConfig zkConfig; private final ZkUtils zkUtils; private final String lockPath; private final String participantId; private final ZkKeyBuilder keyBuilder; private final Random random = new Random(); private String nodePath = null; - private LockListener zkLockListener = null; - private AtomicBoolean hasLock; - public ZkLock(String participantId, ZkUtils zkUtils, ZkConfig zkConfig) { - this.zkConfig = zkConfig; + public ZkLock(String participantId, ZkUtils zkUtils, String initLockPath) { this.zkUtils = zkUtils; this.participantId = participantId; this.keyBuilder = this.zkUtils.getKeyBuilder(); - lockPath = String.format("%s/%s", keyBuilder.getRootPath(), LOCK_PATH); + lockPath = String.format("%s/%s", keyBuilder.getRootPath(), initLockPath); zkUtils.makeSurePersistentPathsExists(new String[] {lockPath}); - this.hasLock = new AtomicBoolean(false); } /** * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out. * Creates a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. + * @param timeout Duration of lock acquiring timeout. + * @param unit Unit of the timeout defined above. + * @return true if lock is acquired successfully, false if it times out. */ @Override - public boolean lock() { + public boolean lock(long timeout, TimeUnit unit) { nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); //Start timer for timeout - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("ZkLock-%d").build()); - scheduler.schedule(() -> { - if (!hasLock.get()) { - throw new SamzaException("Timed out while acquiring lock for creating intermediate streams."); - } - }, zkConfig.getZkSessionTimeoutMs(), TimeUnit.MILLISECONDS); + long startTime = System.currentTimeMillis(); + long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit); - while (!hasLock.get()) { + while ((System.currentTimeMillis() - startTime) < lockTimeout) { List children = zkUtils.getZkClient().getChildren(lockPath); int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); @@ -85,11 +72,9 @@ public boolean lock() { } // Acquires lock when the node has the lowest sequence number and returns. if (index == 0) { - hasLock.set(true); LOG.info("Acquired lock for participant id: {}", participantId); return true; } else { - // Keep trying to acquire the lock try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { @@ -98,7 +83,7 @@ public boolean lock() { LOG.info("Trying to acquire lock again..."); } } - return hasLock.get(); + return false; } /** @@ -108,22 +93,10 @@ public boolean lock() { public void unlock() { if (nodePath != null) { zkUtils.getZkClient().delete(nodePath); - hasLock.set(false); nodePath = null; LOG.info("Ephemeral lock node deleted. Unlocked!"); } else { - LOG.info("Ephemeral lock node doesn't exist"); + LOG.warn("Ephemeral lock node you want to delete doesn't exist"); } } - - @Override - public boolean hasLock() { - return hasLock.get(); - } - - @Override - public void setLockListener(LockListener listener) { - this.zkLockListener = listener; - } - } \ No newline at end of file diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index 6c7827e9ac..b84d4fce8e 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; @@ -34,9 +33,7 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.coordinator.CoordinationUtils; -import org.apache.samza.coordinator.Latch; -import org.apache.samza.coordinator.LeaderElector; -import org.apache.samza.coordinator.LeaderElectorListener; +import org.apache.samza.coordinator.DistributedLock; import org.apache.samza.execution.ExecutionPlan; import org.apache.samza.execution.ExecutionPlanner; import org.apache.samza.execution.StreamManager; @@ -48,7 +45,6 @@ import org.mockito.ArgumentCaptor; import static org.junit.Assert.*; -import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.*; @@ -162,46 +158,18 @@ public String getPlanAsJson() LocalApplicationRunner spy = spy(runner); CoordinationUtils coordinationUtils = mock(CoordinationUtils.class); - LeaderElector leaderElector = new LeaderElector() { - private LeaderElectorListener leaderElectorListener; + DistributedLock lock = new DistributedLock() { @Override - public void setLeaderElectorListener(LeaderElectorListener listener) { - this.leaderElectorListener = listener; + public boolean lock(long timeout, TimeUnit unit) { + return true; } @Override - public void tryBecomeLeader() { - leaderElectorListener.onBecomingLeader(); - } - - @Override - public void resignLeadership() {} - - @Override - public boolean amILeader() { - return false; - } + public void unlock() {} }; - Latch latch = new Latch() { - boolean done = false; - @Override - public void await(long timeout, TimeUnit tu) - throws TimeoutException { - // in this test, latch is released after countDown is invoked - if (!done) { - throw new TimeoutException("timed out waiting for the target path"); - } - } - - @Override - public void countDown() { - done = true; - } - }; - when(coordinationUtils.getLeaderElector()).thenReturn(leaderElector); - when(coordinationUtils.getLatch(anyInt(), anyString())).thenReturn(latch); + when(coordinationUtils.getLock(anyString())).thenReturn(lock); doReturn(coordinationUtils).when(spy).createCoordinationUtils(); try { @@ -361,7 +329,7 @@ public String getPlanAsJson() @Test public void testPlanIdWithShuffledStreamSpecs() { List streamSpecs = ImmutableList.of( - new StreamSpec("test-stream-1", "stream-1", "testStream"), + new StreamSpec("test-stream-1", "stream-1", "testStream"), new StreamSpec("test-stream-2", "stream-2", "testStream"), new StreamSpec("test-stream-3", "stream-3", "testStream")); String planIdBeforeShuffle = getExecutionPlanId(streamSpecs); @@ -430,4 +398,4 @@ private String streamSpecToJson(StreamSpec streamSpec) { streamSpec.getSystemName(), streamSpec.getPhysicalName()); } -} +} \ No newline at end of file