From 3c668d9dc6d75c1cff27c10a5bfdc0b5fee94ede Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Fri, 4 Aug 2017 13:01:08 -0700 Subject: [PATCH 01/11] replace ZkCoordinationUtils with StandAloneCoordinationUtils --- .../CoordinationServiceFactory.java | 36 ------- .../samza/coordinator/CoordinationUtils.java | 44 --------- .../samza/coordinator/LeaderElector.java | 2 + .../StandAloneCoordinationUtils.java | 93 +++++++++++++++++++ .../samza/runtime/LocalApplicationRunner.java | 48 ++++------ .../zk/ZkCoordinationServiceFactory.java | 89 ------------------ .../apache/samza/zk/ZkCoordinationUtils.java | 67 ------------- .../org/apache/samza/zk/ZkJobCoordinator.java | 1 + .../org/apache/samza/zk/ZkLeaderElector.java | 6 ++ .../org/apache/samza/zk/ZkProcessorLatch.java | 6 ++ .../java/org/apache/samza/zk/ZkUtils.java | 2 - .../org/apache/samza/coordinator/Latch.java | 2 + .../runtime/TestLocalApplicationRunner.java | 12 ++- .../apache/samza/zk/TestZkLeaderElector.java | 4 +- .../org/apache/samza/zk/TestZkNamespace.java | 9 +- .../apache/samza/zk/TestZkProcessorLatch.java | 3 +- .../java/org/apache/samza/zk/TestZkUtils.java | 3 +- 17 files changed, 149 insertions(+), 278 deletions(-) delete mode 100644 samza-core/src/main/java/org/apache/samza/coordinator/CoordinationServiceFactory.java delete mode 100644 samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java create mode 100644 samza-core/src/main/java/org/apache/samza/coordinator/StandAloneCoordinationUtils.java delete mode 100644 samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java delete mode 100644 samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationServiceFactory.java deleted file mode 100644 index 497d3e0f00..0000000000 --- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationServiceFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.coordinator; - -import org.apache.samza.config.Config; - - -/** - * factory to instantiate a c{@link CoordinationUtils} service - */ -public interface CoordinationServiceFactory { - /** - * get a unique service instance - * @param groupId - unique id to identify the service - * @param participantId - a unique id that identifies the participant in the service - * @param updatedConfig - configs, to define the details of the service - * @return a unique service instance - */ - CoordinationUtils getCoordinationService(String groupId, String participantId, Config updatedConfig); -} diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java deleted file mode 100644 index 150b3d4fcc..0000000000 --- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.coordinator; - -import org.apache.samza.annotation.InterfaceStability; - -/** - * - * Coordination service provides synchronization primitives. - * The actual implementation (for example ZK based) is left to each implementation class. - * This service provide three primitives: - * - LeaderElection - * - Latch - */ -@InterfaceStability.Evolving -public interface CoordinationUtils { - - /** - * reset the internal structure. Does not happen automatically with stop() - */ - void reset(); - - - // facilities for group coordination - LeaderElector getLeaderElector(); // leaderElector is unique based on the groupId - - Latch getLatch(int size, String latchId); -} diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java b/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java index c624f835d5..5283a7b97a 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java @@ -56,4 +56,6 @@ public interface LeaderElector { * @return True, if the caller is the current leader. False, otherwise */ boolean amILeader(); + + void close(); } diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StandAloneCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/StandAloneCoordinationUtils.java new file mode 100644 index 0000000000..09054a4201 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/StandAloneCoordinationUtils.java @@ -0,0 +1,93 @@ +package org.apache.samza.coordinator; + +import com.google.common.base.Strings; +import org.I0Itec.zkclient.ZkClient; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.ZkConfig; +import org.apache.samza.util.NoOpMetricsRegistry; +import org.apache.samza.zk.ZkJobCoordinatorFactory; +import org.apache.samza.zk.ZkJobCoordinatorMetrics; +import org.apache.samza.zk.ZkKeyBuilder; +import org.apache.samza.zk.ZkLeaderElector; +import org.apache.samza.zk.ZkProcessorLatch; +import org.apache.samza.zk.ZkUtils; +import org.apache.zookeeper.client.ConnectStringParser; +import org.slf4j.Logger; + + +public class StandAloneCoordinationUtils { + private final String participantId; + private final ZkUtils zkUtils; + + private final static Logger LOG = org.slf4j.LoggerFactory.getLogger(StandAloneCoordinationUtils.class); + + public StandAloneCoordinationUtils(String groupId, String participantId, Config config) { + + // currently we figure out if it is ZK based utilities by checking JobCoordinatorConfig.JOB_COORDINATOR_FACTORY + String jobCoordinatorFactoryClassName = config.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ""); + + if (!ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) + throw new SamzaException(String.format("Samza supports only ZK based coordination utilities with %s == %s", + JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ZkJobCoordinatorFactory.class.getName())); + + // create ZK based utils + this.participantId = participantId; + ZkConfig zkConfig = new ZkConfig(config); + + ZkClient zkClient = createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); + + this.zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs(), new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry())); + LOG.info("Created CoordinationUtis1. zkUtils = " + zkUtils + "; groupid=" + groupId); + } + + public LeaderElector getLeaderElector() { + LOG.info("getting LE. participant=" + participantId + ";zkutils=" + zkUtils); + ZkLeaderElector le = new ZkLeaderElector(participantId, zkUtils); + LOG.info("got LE=" + le); + return le; + } + + public Latch getLatch(int size, String latchId) { + LOG.info("getting latch: size=" + size + ";id = " + latchId); + Latch l = new ZkProcessorLatch(size, latchId, participantId, zkUtils); + LOG.info("got latch:" + l); + return l; + } + + public static ZkClient createZkClient(String connectString, int sessionTimeoutMS, int connectionTimeoutMs) { + ZkClient zkClient; + try { + zkClient = new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs); + } catch (Exception e) { + // ZkClient constructor may throw a variety of different exceptions, not all of them Zk based. + throw new SamzaException("zkClient failed to connect to ZK at :" + connectString, e); + } + + // make sure the namespace in zk exists (if specified) + validateZkNameSpace(connectString, zkClient); + + return zkClient; + } + /** + * if ZkConnectString contains namespace path at the end, but it does not exist we should fail + * @param zkConnect - connect string + * @param zkClient - zkClient object to talk to the ZK + */ + public static void validateZkNameSpace(String zkConnect, ZkClient zkClient) { + ConnectStringParser parser = new ConnectStringParser(zkConnect); + + String path = parser.getChrootPath(); + if (Strings.isNullOrEmpty(path)) { + return; // no namespace path + } + + //LOG.info("connectString = " + zkConnect + "; path =" + path); + + // if namespace specified (path above) but "/" does not exists, we will fail + if (!zkClient.exists("/")) { + throw new SamzaException("Zookeeper namespace: " + path + " does not exist for zk at " + zkConnect); + } + } +} diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index b0bfc8a62d..767efb1759 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -30,12 +30,10 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; -import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.TaskConfig; -import org.apache.samza.coordinator.CoordinationUtils; +import org.apache.samza.coordinator.StandAloneCoordinationUtils; import org.apache.samza.coordinator.Latch; import org.apache.samza.coordinator.LeaderElector; import org.apache.samza.execution.ExecutionPlan; @@ -46,8 +44,6 @@ import org.apache.samza.task.AsyncStreamTaskFactory; import org.apache.samza.task.StreamTaskFactory; import org.apache.samza.task.TaskFactoryUtil; -import org.apache.samza.zk.ZkCoordinationServiceFactory; -import org.apache.samza.zk.ZkJobCoordinatorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +59,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { private static final long LATCH_TIMEOUT_MINUTES = 10; private final String uid; - private final CoordinationUtils coordinationUtils; + private final Set processors = ConcurrentHashMap.newKeySet(); private final CountDownLatch shutdownLatch = new CountDownLatch(1); private final AtomicInteger numProcessorsToStart = new AtomicInteger(); @@ -122,9 +118,6 @@ private void shutdownAndNotify() { } } - if (coordinationUtils != null) { - coordinationUtils.reset(); - } shutdownLatch.countDown(); } } @@ -132,7 +125,6 @@ private void shutdownAndNotify() { public LocalApplicationRunner(Config config) { super(config); uid = UUID.randomUUID().toString(); - coordinationUtils = createCoordinationUtils(); } @Override @@ -206,25 +198,9 @@ public void waitForFinish() { } } - /** - * Create the {@link CoordinationUtils} needed by the application runner, or null if it's not configured. - * @return an instance of {@link CoordinationUtils} - */ - /* package private */ CoordinationUtils createCoordinationUtils() { - String jobCoordinatorFactoryClassName = config.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ""); - - // TODO: we will need a better way to package the configs with application runner - if (ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) { - ApplicationConfig appConfig = new ApplicationConfig(config); - return new ZkCoordinationServiceFactory().getCoordinationService(appConfig.getGlobalAppId(), uid, config); - } else { - return null; - } - } - /** * Create intermediate streams using {@link org.apache.samza.execution.StreamManager}. - * If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader + * If {@link StandAloneCoordinationUtils} is provided, this function will first invoke leader election, and the leader * will create the streams. All the runner processes will wait on the latch that is released after the leader finishes * stream creation. * @param intStreams list of intermediate {@link StreamSpec}s @@ -232,15 +208,27 @@ public void waitForFinish() { */ /* package private */ void createStreams(List intStreams) throws Exception { if (!intStreams.isEmpty()) { - if (coordinationUtils != null) { - Latch initLatch = coordinationUtils.getLatch(1, INIT_LATCH_ID); - LeaderElector leaderElector = coordinationUtils.getLeaderElector(); + StandAloneCoordinationUtils coordinationUtils1 = null; + try { + coordinationUtils1 = new StandAloneCoordinationUtils("APP_ID", uid, config); + } catch (SamzaException e) { + LOG.warn("Coordination utils are not available.", e); + } + + if (coordinationUtils1 != null) { + Latch initLatch = coordinationUtils1.getLatch(1, INIT_LATCH_ID); + LeaderElector leaderElector = coordinationUtils1.getLeaderElector(); + leaderElector.setLeaderElectorListener(() -> { getStreamManager().createStreams(intStreams); initLatch.countDown(); }); leaderElector.tryBecomeLeader(); initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES); + + // close the connections + leaderElector.close(); + initLatch.close(); } else { // each application process will try creating the streams, which // requires stream creation to be idempotent diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java deleted file mode 100644 index 1dd5ec9362..0000000000 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.zk; - -import com.google.common.base.Strings; -import org.I0Itec.zkclient.ZkClient; -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.config.ZkConfig; -import org.apache.samza.coordinator.CoordinationServiceFactory; -import org.apache.samza.coordinator.CoordinationUtils; -import org.apache.samza.util.NoOpMetricsRegistry; -import org.apache.zookeeper.client.ConnectStringParser; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class ZkCoordinationServiceFactory implements CoordinationServiceFactory { - private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class); - - public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) { - ZkConfig zkConfig = new ZkConfig(config); - - ZkClient zkClient = - createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); - - ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs(), new NoOpMetricsRegistry()); - return new ZkCoordinationUtils(participantId, zkConfig, zkUtils); - } - - /** - * helper method to create zkClient - * @param connectString - zkConnect string - * @param sessionTimeoutMS - session timeout - * @param connectionTimeoutMs - connection timeout - * @return zkClient object - */ - public static ZkClient createZkClient(String connectString, int sessionTimeoutMS, int connectionTimeoutMs) { - ZkClient zkClient; - try { - zkClient = new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs); - } catch (Exception e) { - // ZkClient constructor may throw a variety of different exceptions, not all of them Zk based. - throw new SamzaException("zkClient failed to connect to ZK at :" + connectString, e); - } - - // make sure the namespace in zk exists (if specified) - validateZkNameSpace(connectString, zkClient); - - return zkClient; - } - - /** - * if ZkConnectString contains namespace path at the end, but it does not exist we should fail - * @param zkConnect - connect string - * @param zkClient - zkClient object to talk to the ZK - */ - public static void validateZkNameSpace(String zkConnect, ZkClient zkClient) { - ConnectStringParser parser = new ConnectStringParser(zkConnect); - - String path = parser.getChrootPath(); - if (Strings.isNullOrEmpty(path)) { - return; // no namespace path - } - - LOG.info("connectString = " + zkConnect + "; path =" + path); - - // if namespace specified (path above) but "/" does not exists, we will fail - if (!zkClient.exists("/")) { - throw new SamzaException("Zookeeper namespace: " + path + " does not exist for zk at " + zkConnect); - } - } -} diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java deleted file mode 100644 index f5dda2e880..0000000000 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.zk; - -import org.I0Itec.zkclient.exception.ZkInterruptedException; -import org.apache.samza.config.ZkConfig; -import org.apache.samza.coordinator.CoordinationUtils; -import org.apache.samza.coordinator.Latch; -import org.apache.samza.coordinator.LeaderElector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class ZkCoordinationUtils implements CoordinationUtils { - private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinationUtils.class); - - public final ZkConfig zkConfig; - public final ZkUtils zkUtils; - public final String processorIdStr; - - public ZkCoordinationUtils(String processorId, ZkConfig zkConfig, ZkUtils zkUtils) { - this.zkConfig = zkConfig; - this.zkUtils = zkUtils; - this.processorIdStr = processorId; - } - - @Override - public void reset() { - try { - zkUtils.close(); - } catch (ZkInterruptedException ex) { - // Swallowing due to occurrence in the last stage of lifecycle(Not actionable). - LOG.error("Exception in reset: ", ex); - } - } - - @Override - public LeaderElector getLeaderElector() { - return new ZkLeaderElector(processorIdStr, zkUtils); - } - - @Override - public Latch getLatch(int size, String latchId) { - return new ZkProcessorLatch(size, latchId, processorIdStr, zkUtils); - } - - // TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer - public ZkUtils getZkUtils() { - return zkUtils; - } -} diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 9f64b3aba6..5ad2c1a626 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -33,6 +33,7 @@ import org.apache.samza.config.JobConfig; import org.apache.samza.config.MetricsConfig; import org.apache.samza.config.ZkConfig; +import org.apache.samza.coordinator.StandAloneCoordinationUtils; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorListener; import org.apache.samza.coordinator.JobModelManager; diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java index 97430cbde8..40ec3ef423 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java @@ -169,6 +169,12 @@ private String zLog(String logMessage) { return String.format("[Processor-%s] %s", processorIdStr, logMessage); } + @Override + public void close() { + if(zkUtils != null) + zkUtils.close(); + } + // Only by non-leaders class PreviousProcessorChangeListener extends ZkUtils.GenIZkDataListener { diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java index ddc897665a..6f09bf6f7f 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java @@ -63,4 +63,10 @@ public void countDown() { String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId); LOGGER.debug("ZKProcessorLatch countDown created " + path); } + + @Override + public void close() { + if(zkUtils != null) + zkUtils.close(); + } } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 5df7114730..2cb09da29c 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -90,8 +90,6 @@ public int getGeneration() { return currentGeneration.get(); } - - public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, MetricsRegistry metricsRegistry) { this.keyBuilder = zkKeyBuilder; this.connectionTimeoutMs = connectionTimeoutMs; diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/Latch.java b/samza-core/src/main/scala/org/apache/samza/coordinator/Latch.java index 5ca91380f9..4fd4e1e4f5 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/Latch.java +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/Latch.java @@ -30,4 +30,6 @@ public interface Latch { void await(long timeout, TimeUnit tu) throws TimeoutException; void countDown(); + + void close(); } diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index a04bd3b06f..cb8e8c90fd 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -24,7 +24,7 @@ import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; -import org.apache.samza.coordinator.CoordinationUtils; +import org.apache.samza.coordinator.StandAloneCoordinationUtils; import org.apache.samza.coordinator.Latch; import org.apache.samza.coordinator.LeaderElector; import org.apache.samza.coordinator.LeaderElectorListener; @@ -152,7 +152,7 @@ public String getPlanAsJson() LocalApplicationRunner spy = spy(runner); - CoordinationUtils coordinationUtils = mock(CoordinationUtils.class); + StandAloneCoordinationUtils coordinationUtils = mock(StandAloneCoordinationUtils.class); LeaderElector leaderElector = new LeaderElector() { private LeaderElectorListener leaderElectorListener; @@ -173,6 +173,9 @@ public void resignLeadership() {} public boolean amILeader() { return false; } + + @Override + public void close() { } }; Latch latch = new Latch() { @@ -188,10 +191,13 @@ public void await(long timeout, TimeUnit tu) public void countDown() { done = true; } + + @Override + public void close() { } }; when(coordinationUtils.getLeaderElector()).thenReturn(leaderElector); when(coordinationUtils.getLatch(anyInt(), anyString())).thenReturn(latch); - doReturn(coordinationUtils).when(spy).createCoordinationUtils(); + //doReturn(coordinationUtils).when(spy).createCoordinationUtils(); try { spy.run(app); diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java index 3ff91757d4..eb37b0ff7b 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -27,6 +27,7 @@ import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.samza.SamzaException; +import org.apache.samza.coordinator.StandAloneCoordinationUtils; import org.apache.samza.testUtils.EmbeddedZookeeper; import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.After; @@ -433,7 +434,8 @@ public void testAmILeader() { } private ZkUtils getZkUtilsWithNewClient() { - ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS); + ZkClient zkClient = StandAloneCoordinationUtils + .createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS); return new ZkUtils( KEY_BUILDER, zkClient, diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java index 3ce203e1b7..45db15fe7b 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java @@ -23,6 +23,7 @@ import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.samza.SamzaException; +import org.apache.samza.coordinator.StandAloneCoordinationUtils; import org.apache.samza.testUtils.EmbeddedZookeeper; import org.junit.AfterClass; import org.junit.Assert; @@ -90,7 +91,7 @@ private void testDoNotFailIfNameSpacePresent(String zkNameSpace) { String zkConnect = "127.0.0.1:" + zkServer.getPort() + zkNameSpace; createNamespace(zkNameSpace); initZk(zkConnect); - ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient); + StandAloneCoordinationUtils.validateZkNameSpace(zkConnect, zkClient); zkClient.createPersistent("/test"); zkClient.createPersistent("/test/test1"); @@ -106,7 +107,7 @@ public void testValidateFailZkNameSpace1LevelPath() { try { String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/zkNameSpace"; initZk(zkConnect); - ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient); + StandAloneCoordinationUtils.validateZkNameSpace(zkConnect, zkClient); Assert.fail("1.Should fail with exception, because namespace doesn't exist"); } catch (SamzaException e) { // expected @@ -120,7 +121,7 @@ public void testValidateFailZkNameSpace2LevelPath() { try { String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/zkNameSpace/xyz"; initZk(zkConnect); - ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient); + StandAloneCoordinationUtils.validateZkNameSpace(zkConnect, zkClient); Assert.fail("2.Should fail with exception, because namespace doesn't exist"); } catch (SamzaException e) { // expected @@ -134,7 +135,7 @@ public void testValidateFailZkNameSpaceEmptyPath() { // should succeed, because no namespace provided String zkConnect = "127.0.0.1:" + zkServer.getPort() + ""; initZk(zkConnect); - ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient); + StandAloneCoordinationUtils.validateZkNameSpace(zkConnect, zkClient); tearDownZk(); } diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java index b2a5533213..efc3b09972 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException; import org.I0Itec.zkclient.ZkClient; import org.apache.samza.coordinator.Latch; +import org.apache.samza.coordinator.StandAloneCoordinationUtils; import org.apache.samza.testUtils.EmbeddedZookeeper; import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.After; @@ -215,7 +216,7 @@ public void testLatchExpires() { } private ZkUtils getZkUtilsWithNewClient(String processorId) { - ZkClient zkClient = ZkCoordinationServiceFactory + ZkClient zkClient = StandAloneCoordinationUtils .createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS); return new ZkUtils( KEY_BUILDER, diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index b5953d1a5f..93f352286c 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -121,7 +121,8 @@ public void testGetProcessorsIDs() { zkUtils.registerProcessorAndGetId(new ProcessorData("host1", "1")); List l = zkUtils.getSortedActiveProcessorsIDs(); Assert.assertEquals(1, l.size()); - new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId(new ProcessorData("host2", "2")); + new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()). + registerProcessorAndGetId(new ProcessorData("host2", "2")); l = zkUtils.getSortedActiveProcessorsIDs(); Assert.assertEquals(2, l.size()); From aeb8ceae176227780ef050700c239492dd95a466 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Fri, 4 Aug 2017 13:12:14 -0700 Subject: [PATCH 02/11] checkstyle --- .../StandAloneCoordinationUtils.java | 22 +++++++++++++++++-- .../org/apache/samza/zk/ZkJobCoordinator.java | 3 +-- .../samza/zk/ZkJobCoordinatorFactory.java | 3 ++- .../org/apache/samza/zk/ZkLeaderElector.java | 2 +- .../org/apache/samza/zk/ZkProcessorLatch.java | 2 +- 5 files changed, 25 insertions(+), 7 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StandAloneCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/StandAloneCoordinationUtils.java index 09054a4201..d49e38f1bb 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/StandAloneCoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/StandAloneCoordinationUtils.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.coordinator; import com.google.common.base.Strings; @@ -8,7 +27,6 @@ import org.apache.samza.config.ZkConfig; import org.apache.samza.util.NoOpMetricsRegistry; import org.apache.samza.zk.ZkJobCoordinatorFactory; -import org.apache.samza.zk.ZkJobCoordinatorMetrics; import org.apache.samza.zk.ZkKeyBuilder; import org.apache.samza.zk.ZkLeaderElector; import org.apache.samza.zk.ZkProcessorLatch; @@ -38,7 +56,7 @@ public StandAloneCoordinationUtils(String groupId, String participantId, Config ZkClient zkClient = createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); - this.zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs(), new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry())); + this.zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs(), new NoOpMetricsRegistry()); LOG.info("Created CoordinationUtis1. zkUtils = " + zkUtils + "; groupid=" + groupId); } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 5ad2c1a626..5614309c0c 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -24,8 +24,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import org.I0Itec.zkclient.IZkStateListener; import java.util.Set; +import org.I0Itec.zkclient.IZkStateListener; import org.apache.commons.lang3.StringUtils; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; @@ -33,7 +33,6 @@ import org.apache.samza.config.JobConfig; import org.apache.samza.config.MetricsConfig; import org.apache.samza.config.ZkConfig; -import org.apache.samza.coordinator.StandAloneCoordinationUtils; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorListener; import org.apache.samza.coordinator.JobModelManager; diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java index 85e3b4ab55..c60e4146d4 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -25,6 +25,7 @@ import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; +import org.apache.samza.coordinator.StandAloneCoordinationUtils; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.MetricsRegistryMap; import org.slf4j.Logger; @@ -51,7 +52,7 @@ public JobCoordinator getJobCoordinator(Config config) { private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry) { ZkConfig zkConfig = new ZkConfig(config); ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId()); - ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); + ZkClient zkClient = StandAloneCoordinationUtils.createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), metricsRegistry); } } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java index 40ec3ef423..b182214ef7 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java @@ -171,7 +171,7 @@ private String zLog(String logMessage) { @Override public void close() { - if(zkUtils != null) + if (zkUtils != null) zkUtils.close(); } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java index 6f09bf6f7f..a6c8368dc7 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java @@ -66,7 +66,7 @@ public void countDown() { @Override public void close() { - if(zkUtils != null) + if (zkUtils != null) zkUtils.close(); } } From 8b097591cae76279b53e347bb085af3436d92193 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Tue, 8 Aug 2017 11:05:24 -0700 Subject: [PATCH 03/11] added interface --- .../samza/coordinator/CoordinationUtils.java | 54 +++++++++++++++++++ ...ionUtils.java => ZkCoordinationUtils.java} | 21 +++----- .../samza/runtime/LocalApplicationRunner.java | 14 ++--- .../samza/zk/ZkJobCoordinatorFactory.java | 5 +- .../runtime/TestLocalApplicationRunner.java | 4 +- .../apache/samza/zk/TestZkLeaderElector.java | 4 +- .../org/apache/samza/zk/TestZkNamespace.java | 10 ++-- .../apache/samza/zk/TestZkProcessorLatch.java | 4 +- 8 files changed, 82 insertions(+), 34 deletions(-) create mode 100644 samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java rename samza-core/src/main/java/org/apache/samza/coordinator/{StandAloneCoordinationUtils.java => ZkCoordinationUtils.java} (79%) diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java new file mode 100644 index 0000000000..c85158fc06 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java @@ -0,0 +1,54 @@ +/* + - * Licensed to the Apache Software Foundation (ASF) under one + - * or more contributor license agreements. See the NOTICE file + - * distributed with this work for additional information + - * regarding copyright ownership. The ASF licenses this file + - * to you under the Apache License, Version 2.0 (the + - * "License"); you may not use this file except in compliance + - * with the License. You may obtain a copy of the License at + - * + - * http://www.apache.org/licenses/LICENSE-2.0 + - * + - * Unless required by applicable law or agreed to in writing, + - * software distributed under the License is distributed on an + - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - * KIND, either express or implied. See the License for the + - * specific language governing permissions and limitations + - * under the License. + - */ +package org.apache.samza.coordinator; + +import org.apache.samza.SamzaException; +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.zk.ZkJobCoordinatorFactory; + + +/** + * + * Coordination service provides synchronization primitives. + * The actual implementation (for example ZK based) is left to each implementation class. + * This service provide three primitives: + * - LeaderElection + * - Latch + */ +@InterfaceStability.Evolving +public interface CoordinationUtils { + + public static CoordinationUtils getCoordinationUtils(String groupId, String participantId, Config config) { + + // currently we figure out if it is ZK based utilities by checking JobCoordinatorConfig.JOB_COORDINATOR_FACTORY + String jobCoordinatorFactoryClassName = config.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ""); + + if (!ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) + throw new SamzaException(String.format("Samza supports only ZK based coordination utilities with %s == %s", + JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ZkJobCoordinatorFactory.class.getName())); + + return new ZkCoordinationUtils(groupId, participantId, config); + } + + // facilities for group coordination + LeaderElector getLeaderElector(); + Latch getLatch(int size, String latchId); +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StandAloneCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/ZkCoordinationUtils.java similarity index 79% rename from samza-core/src/main/java/org/apache/samza/coordinator/StandAloneCoordinationUtils.java rename to samza-core/src/main/java/org/apache/samza/coordinator/ZkCoordinationUtils.java index d49e38f1bb..804b93b90b 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/StandAloneCoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/ZkCoordinationUtils.java @@ -23,10 +23,8 @@ import org.I0Itec.zkclient.ZkClient; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; -import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.ZkConfig; import org.apache.samza.util.NoOpMetricsRegistry; -import org.apache.samza.zk.ZkJobCoordinatorFactory; import org.apache.samza.zk.ZkKeyBuilder; import org.apache.samza.zk.ZkLeaderElector; import org.apache.samza.zk.ZkProcessorLatch; @@ -35,31 +33,24 @@ import org.slf4j.Logger; -public class StandAloneCoordinationUtils { +public class ZkCoordinationUtils implements CoordinationUtils { private final String participantId; private final ZkUtils zkUtils; - private final static Logger LOG = org.slf4j.LoggerFactory.getLogger(StandAloneCoordinationUtils.class); + private final static Logger LOG = org.slf4j.LoggerFactory.getLogger(ZkCoordinationUtils.class); - public StandAloneCoordinationUtils(String groupId, String participantId, Config config) { + ZkCoordinationUtils(String groupId, String participantId, Config config) { - // currently we figure out if it is ZK based utilities by checking JobCoordinatorConfig.JOB_COORDINATOR_FACTORY - String jobCoordinatorFactoryClassName = config.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ""); - - if (!ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) - throw new SamzaException(String.format("Samza supports only ZK based coordination utilities with %s == %s", - JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ZkJobCoordinatorFactory.class.getName())); - - // create ZK based utils this.participantId = participantId; ZkConfig zkConfig = new ZkConfig(config); ZkClient zkClient = createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); this.zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs(), new NoOpMetricsRegistry()); - LOG.info("Created CoordinationUtis1. zkUtils = " + zkUtils + "; groupid=" + groupId); + LOG.info("Created ZkCoordinationUtils. zkUtils = " + zkUtils + "; groupid=" + groupId); } + @Override public LeaderElector getLeaderElector() { LOG.info("getting LE. participant=" + participantId + ";zkutils=" + zkUtils); ZkLeaderElector le = new ZkLeaderElector(participantId, zkUtils); @@ -67,6 +58,7 @@ public LeaderElector getLeaderElector() { return le; } + @Override public Latch getLatch(int size, String latchId) { LOG.info("getting latch: size=" + size + ";id = " + latchId); Latch l = new ZkProcessorLatch(size, latchId, participantId, zkUtils); @@ -74,6 +66,7 @@ public Latch getLatch(int size, String latchId) { return l; } + // static auxiliary methods public static ZkClient createZkClient(String connectString, int sessionTimeoutMS, int connectionTimeoutMs) { ZkClient zkClient; try { diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 767efb1759..7d7bab6aad 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -33,7 +33,7 @@ import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.TaskConfig; -import org.apache.samza.coordinator.StandAloneCoordinationUtils; +import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.Latch; import org.apache.samza.coordinator.LeaderElector; import org.apache.samza.execution.ExecutionPlan; @@ -200,7 +200,7 @@ public void waitForFinish() { /** * Create intermediate streams using {@link org.apache.samza.execution.StreamManager}. - * If {@link StandAloneCoordinationUtils} is provided, this function will first invoke leader election, and the leader + * If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader * will create the streams. All the runner processes will wait on the latch that is released after the leader finishes * stream creation. * @param intStreams list of intermediate {@link StreamSpec}s @@ -208,16 +208,16 @@ public void waitForFinish() { */ /* package private */ void createStreams(List intStreams) throws Exception { if (!intStreams.isEmpty()) { - StandAloneCoordinationUtils coordinationUtils1 = null; + CoordinationUtils coordinationUtils = null; try { - coordinationUtils1 = new StandAloneCoordinationUtils("APP_ID", uid, config); + coordinationUtils = CoordinationUtils.getCoordinationUtils("APP_ID", uid, config); } catch (SamzaException e) { LOG.warn("Coordination utils are not available.", e); } - if (coordinationUtils1 != null) { - Latch initLatch = coordinationUtils1.getLatch(1, INIT_LATCH_ID); - LeaderElector leaderElector = coordinationUtils1.getLeaderElector(); + if (coordinationUtils != null) { + Latch initLatch = coordinationUtils.getLatch(1, INIT_LATCH_ID); + LeaderElector leaderElector = coordinationUtils.getLeaderElector(); leaderElector.setLeaderElectorListener(() -> { getStreamManager().createStreams(intStreams); diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java index c60e4146d4..963d1b1caf 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -25,7 +25,7 @@ import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; -import org.apache.samza.coordinator.StandAloneCoordinationUtils; +import org.apache.samza.coordinator.ZkCoordinationUtils; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.MetricsRegistryMap; import org.slf4j.Logger; @@ -52,7 +52,8 @@ public JobCoordinator getJobCoordinator(Config config) { private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry) { ZkConfig zkConfig = new ZkConfig(config); ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId()); - ZkClient zkClient = StandAloneCoordinationUtils.createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); + ZkClient zkClient = ZkCoordinationUtils + .createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), metricsRegistry); } } diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index cb8e8c90fd..57e07d6d12 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -24,7 +24,7 @@ import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; -import org.apache.samza.coordinator.StandAloneCoordinationUtils; +import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.Latch; import org.apache.samza.coordinator.LeaderElector; import org.apache.samza.coordinator.LeaderElectorListener; @@ -152,7 +152,7 @@ public String getPlanAsJson() LocalApplicationRunner spy = spy(runner); - StandAloneCoordinationUtils coordinationUtils = mock(StandAloneCoordinationUtils.class); + CoordinationUtils coordinationUtils = mock(CoordinationUtils.class); LeaderElector leaderElector = new LeaderElector() { private LeaderElectorListener leaderElectorListener; diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java index eb37b0ff7b..d6eb69e5a2 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -27,7 +27,7 @@ import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.samza.SamzaException; -import org.apache.samza.coordinator.StandAloneCoordinationUtils; +import org.apache.samza.coordinator.ZkCoordinationUtils; import org.apache.samza.testUtils.EmbeddedZookeeper; import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.After; @@ -434,7 +434,7 @@ public void testAmILeader() { } private ZkUtils getZkUtilsWithNewClient() { - ZkClient zkClient = StandAloneCoordinationUtils + ZkClient zkClient = ZkCoordinationUtils .createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS); return new ZkUtils( KEY_BUILDER, diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java index 45db15fe7b..d2af45ada5 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java @@ -23,7 +23,7 @@ import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.samza.SamzaException; -import org.apache.samza.coordinator.StandAloneCoordinationUtils; +import org.apache.samza.coordinator.ZkCoordinationUtils; import org.apache.samza.testUtils.EmbeddedZookeeper; import org.junit.AfterClass; import org.junit.Assert; @@ -91,7 +91,7 @@ private void testDoNotFailIfNameSpacePresent(String zkNameSpace) { String zkConnect = "127.0.0.1:" + zkServer.getPort() + zkNameSpace; createNamespace(zkNameSpace); initZk(zkConnect); - StandAloneCoordinationUtils.validateZkNameSpace(zkConnect, zkClient); + ZkCoordinationUtils.validateZkNameSpace(zkConnect, zkClient); zkClient.createPersistent("/test"); zkClient.createPersistent("/test/test1"); @@ -107,7 +107,7 @@ public void testValidateFailZkNameSpace1LevelPath() { try { String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/zkNameSpace"; initZk(zkConnect); - StandAloneCoordinationUtils.validateZkNameSpace(zkConnect, zkClient); + ZkCoordinationUtils.validateZkNameSpace(zkConnect, zkClient); Assert.fail("1.Should fail with exception, because namespace doesn't exist"); } catch (SamzaException e) { // expected @@ -121,7 +121,7 @@ public void testValidateFailZkNameSpace2LevelPath() { try { String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/zkNameSpace/xyz"; initZk(zkConnect); - StandAloneCoordinationUtils.validateZkNameSpace(zkConnect, zkClient); + ZkCoordinationUtils.validateZkNameSpace(zkConnect, zkClient); Assert.fail("2.Should fail with exception, because namespace doesn't exist"); } catch (SamzaException e) { // expected @@ -135,7 +135,7 @@ public void testValidateFailZkNameSpaceEmptyPath() { // should succeed, because no namespace provided String zkConnect = "127.0.0.1:" + zkServer.getPort() + ""; initZk(zkConnect); - StandAloneCoordinationUtils.validateZkNameSpace(zkConnect, zkClient); + ZkCoordinationUtils.validateZkNameSpace(zkConnect, zkClient); tearDownZk(); } diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java index efc3b09972..6cbc2d6063 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java @@ -26,7 +26,7 @@ import java.util.concurrent.TimeoutException; import org.I0Itec.zkclient.ZkClient; import org.apache.samza.coordinator.Latch; -import org.apache.samza.coordinator.StandAloneCoordinationUtils; +import org.apache.samza.coordinator.ZkCoordinationUtils; import org.apache.samza.testUtils.EmbeddedZookeeper; import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.After; @@ -216,7 +216,7 @@ public void testLatchExpires() { } private ZkUtils getZkUtilsWithNewClient(String processorId) { - ZkClient zkClient = StandAloneCoordinationUtils + ZkClient zkClient = ZkCoordinationUtils .createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS); return new ZkUtils( KEY_BUILDER, From f1881b1e65d1ffb67c9d669192c6e0d5fe90a079 Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Tue, 8 Aug 2017 10:33:26 -0700 Subject: [PATCH 04/11] Add lock --- .../org/apache/samza/coordinator/Lock.java | 38 +++++ .../samza/coordinator/LockListener.java | 30 ++++ .../main/java/org/apache/samza/zk/ZkLock.java | 152 ++++++++++++++++++ .../java/org/apache/samza/zk/TestZkLock.java | 23 +++ 4 files changed, 243 insertions(+) create mode 100644 samza-core/src/main/java/org/apache/samza/coordinator/Lock.java create mode 100644 samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java create mode 100644 samza-core/src/main/java/org/apache/samza/zk/ZkLock.java create mode 100644 samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java b/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java new file mode 100644 index 0000000000..27e8437775 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator; + +public interface Lock { + + /** + * Acquires the lock + */ + void lock(); + + /** + * Releases the lock + */ + void unlock(); + + boolean hasLock(); + + void setLockListener(LockListener listener); + +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java b/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java new file mode 100644 index 0000000000..f80da793e6 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +public interface LockListener { + + public void onAcquiringLock(); + + /** + * Perform the necessary operation when a notification about acquire lock/release lock/error has been received. + */ + public void onError(); + +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java new file mode 100644 index 0000000000..d34f7bf8fd --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import org.I0Itec.zkclient.IZkDataListener; +import org.apache.samza.SamzaException; +import org.apache.samza.coordinator.Lock; +import org.apache.samza.coordinator.LockListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ZkLock implements Lock { + + public static final Logger LOG = LoggerFactory.getLogger(ZkLeaderElector.class); + private final ZkUtils zkUtils; + private final String lockPath; + private final String participantId; + private String currentSubscription = null; + private final ZkKeyBuilder keyBuilder; + private final IZkDataListener previousProcessorChangeListener; + private final Random random = new Random(); + private String nodePath = null; + private LockListener zkLockListener = null; + private AtomicBoolean hasLock; + private final static String LOCK_PATH = "lock"; + + public ZkLock(String participantId, ZkUtils zkUtils) { + this.zkUtils = zkUtils; + this.participantId = participantId; + this.keyBuilder = this.zkUtils.getKeyBuilder(); + this.previousProcessorChangeListener = new PreviousLockProcessorChangeListener(); + lockPath = String.format("%s/%s", keyBuilder.getRootPath(), LOCK_PATH); + zkUtils.makeSurePersistentPathsExists(new String[] {lockPath}); + this.hasLock = new AtomicBoolean(false); + } + + /** + * Create a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. + */ + @Override + public void lock() { + try { + nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); + } catch (Exception e) { + zkLockListener.onError(); + } + List children = zkUtils.getZkClient().getChildren(lockPath); + int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); + + if (children.size() == 0 || index == -1) { + throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!"); + } + + if (index == 0) { + hasLock.set(true); + if (zkLockListener != null) { + zkLockListener.onAcquiringLock(); + } + } else { + String predecessor = children.get(index - 1); + if (!predecessor.equals(currentSubscription)) { + if (currentSubscription != null) { + zkUtils.unsubscribeDataChanges(lockPath + "/" + currentSubscription, + previousProcessorChangeListener); + } + currentSubscription = predecessor; + zkUtils.subscribeDataChanges(lockPath + "/" + currentSubscription, + previousProcessorChangeListener); + } + /** + * Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes + * on the path, even if the path doesn't exist. Since we are using Ephemeral Sequential nodes, if the path doesn't + * exist during subscription, it is not going to get created in the future. + */ + boolean predecessorExists = zkUtils.exists(lockPath + "/" + currentSubscription); + if (predecessorExists) { + LOG.info("Predecessor still exists. Current subscription is valid. Continuing as non-lockholder."); + } else { + try { + Thread.sleep(random.nextInt(1000)); + } catch (InterruptedException e) { + Thread.interrupted(); + } + LOG.info("Predecessor doesn't exist anymore. Trying to acquire lock again..."); + lock(); + } + } + } + + + /** + * Delete the node created to acquire the lock + */ + @Override + public void unlock() { + if (nodePath != null) { + Boolean status = false; + while (!status && nodePath != null) { + status = zkUtils.getZkClient().delete(nodePath); + } + hasLock.set(false); + nodePath = null; + LOG.info("Ephemeral lock node deleted. Unlocked!"); + } else { + LOG.info("Ephemeral lock node doesn't exist"); + } + } + + @Override + public boolean hasLock() { + return hasLock.get(); + } + + @Override + public void setLockListener(LockListener listener) { + this.zkLockListener = listener; + } + + class PreviousLockProcessorChangeListener implements IZkDataListener { + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + LOG.debug("Data change on path: " + dataPath + " Data: " + data); + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + LOG.info("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again..."); + lock(); + } + } + +} \ No newline at end of file diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java new file mode 100644 index 0000000000..50d55702a6 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.zk; + +public class TestZkLock { +} From eddeb45addfef458e8b7875b4d8703191c2a74c5 Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Wed, 9 Aug 2017 11:54:08 -0700 Subject: [PATCH 05/11] Add create streams with lock --- .../org/apache/samza/system/SystemAdmin.java | 9 +++ .../samza/coordinator/CoordinationUtils.java | 2 + .../apache/samza/execution/StreamManager.java | 15 ++++ .../samza/runtime/LocalApplicationRunner.java | 69 ++++++++++++++++++- .../apache/samza/zk/ZkCoordinationUtils.java | 6 ++ .../main/java/org/apache/samza/zk/ZkLock.java | 23 +++++-- .../samza/system/kafka/KafkaSystemAdmin.scala | 12 +++- 7 files changed, 126 insertions(+), 10 deletions(-) diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index b18071243d..983c4648d9 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -114,4 +114,13 @@ default boolean createStream(StreamSpec streamSpec) { default void validateStream(StreamSpec streamSpec) throws StreamValidationException { throw new UnsupportedOperationException(); } + + /** + * Check if the stream described by the spec already exists. + * @param streamSpec The spec, or blueprint for the physical stream on the system. + * @return true if stream exists already, false otherwise + */ + default boolean existStream(StreamSpec streamSpec) { + return false; + } } diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java index 150b3d4fcc..32e4f6c4bd 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java @@ -41,4 +41,6 @@ public interface CoordinationUtils { LeaderElector getLeaderElector(); // leaderElector is unique based on the groupId Latch getLatch(int size, String latchId); + + Lock getLock(); } diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java index c6ab036228..c2eb78a7e6 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java @@ -75,4 +75,19 @@ Map getStreamPartitionCounts(String systemName, Set str return streamToPartitionCount; } + + /** + * Check if the streams described by the specs already exist. + * @param streams A list of stream specs, whose existence we need to check for + * @return true if all the streams exist already, false otherwise + */ + public boolean checkIfStreamsExist(List streams) { + for (StreamSpec spec: streams) { + SystemAdmin systemAdmin = sysAdmins.get(spec.getSystemName()); + if (!systemAdmin.existStream(spec)) { + return false; + } + } + return true; + } } diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index b0bfc8a62d..8dfb506551 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -38,6 +38,8 @@ import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.Latch; import org.apache.samza.coordinator.LeaderElector; +import org.apache.samza.coordinator.Lock; +import org.apache.samza.coordinator.LockListener; import org.apache.samza.execution.ExecutionPlan; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.processor.StreamProcessor; @@ -162,7 +164,8 @@ public void run(StreamApplication app) { writePlanJsonFile(plan.getPlanAsJson()); // 2. create the necessary streams - createStreams(plan.getIntermediateStreams()); +// createStreams(plan.getIntermediateStreams()); + createStreamsWithLock(plan.getIntermediateStreams()); // 3. create the StreamProcessors if (plan.getJobConfigs().isEmpty()) { @@ -249,6 +252,44 @@ public void waitForFinish() { } } + + private void createStream(List intStreams) throws Exception { + boolean streamsExist = getStreamManager().checkIfStreamsExist(intStreams); + if (!streamsExist) { + getStreamManager().createStreams(intStreams); + } + } + + /** + * Create intermediate streams using {@link org.apache.samza.execution.StreamManager}. + * If {@link CoordinationUtils} is provided, this function will first acquire a lock, and then create the streams. + * All the runner processes will wait till the time they acquire the lock. On acquiring lock, they will check if the streams have already been created. + * If streams exist already, they will unlock and proceed normally. If streams don't exist, they create the streams and then unlock. + * @param intStreams list of intermediate {@link StreamSpec}s + * @throws Exception exception for latch timeout + */ + private void createStreamsWithLock(List intStreams) throws Exception { + if (!intStreams.isEmpty()) { + if (coordinationUtils != null) { + Lock initLock = coordinationUtils.getLock(); + LockListener lockListener = null; + if (coordinationUtils.getClass().getName().equals("org.apache.samza.zk.ZkCoordinationUtils")) { + lockListener = createZkLockListener(intStreams, initLock); + } + initLock.setLockListener(lockListener); + boolean hasLock = false; + while (!hasLock) { + initLock.lock(); + hasLock = initLock.hasLock(); + } + } else { + // each application process will try creating the streams, which + // requires stream creation to be idempotent + getStreamManager().createStreams(intStreams); + } + } + } + /** * Create {@link StreamProcessor} based on {@link StreamApplication} and the config * @param config config @@ -272,4 +313,28 @@ StreamProcessor createStreamProcessor( taskFactory.getClass().getCanonicalName())); } } -} + + private LockListener createZkLockListener(List intStreams, Lock initLock) { + return new LockListener() { + /** + * When the lock is acquired, create the intermediate streams. + */ + @Override + public void onAcquiringLock() { + try { + createStreams(intStreams); + LOG.info("Created intermediate streams successfully!"); + } catch (Exception e) { + onError(); + } + initLock.unlock(); + } + + @Override + public void onError() { + LOG.info("Error while creating streams! Trying again."); + } + }; + } + +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java index f5dda2e880..aa366d7dea 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java @@ -23,6 +23,7 @@ import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.Latch; import org.apache.samza.coordinator.LeaderElector; +import org.apache.samza.coordinator.Lock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +61,11 @@ public Latch getLatch(int size, String latchId) { return new ZkProcessorLatch(size, latchId, processorIdStr, zkUtils); } + @Override + public Lock getLock() { + return new ZkLock(processorIdStr, zkUtils); + } + // TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer public ZkUtils getZkUtils() { return zkUtils; diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java index d34f7bf8fd..40567de457 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java @@ -29,20 +29,23 @@ import org.slf4j.LoggerFactory; +/** + * Lock primitive for Zookeeper. + */ public class ZkLock implements Lock { - public static final Logger LOG = LoggerFactory.getLogger(ZkLeaderElector.class); + public static final Logger LOG = LoggerFactory.getLogger(ZkLock.class); + private final static String LOCK_PATH = "lock"; private final ZkUtils zkUtils; private final String lockPath; private final String participantId; - private String currentSubscription = null; private final ZkKeyBuilder keyBuilder; private final IZkDataListener previousProcessorChangeListener; private final Random random = new Random(); + private String currentSubscription = null; private String nodePath = null; private LockListener zkLockListener = null; private AtomicBoolean hasLock; - private final static String LOCK_PATH = "lock"; public ZkLock(String participantId, ZkUtils zkUtils) { this.zkUtils = zkUtils; @@ -55,14 +58,17 @@ public ZkLock(String participantId, ZkUtils zkUtils) { } /** - * Create a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. + * Create a sequential ephemeral node to acquire the lock. + * If the path of this node has the lowest sequence number, the processor has acquired the lock. */ @Override public void lock() { try { nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); } catch (Exception e) { - zkLockListener.onError(); + if (zkLockListener != null) { + zkLockListener.onError(); + } } List children = zkUtils.getZkClient().getChildren(lockPath); int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); @@ -72,9 +78,12 @@ public void lock() { } if (index == 0) { + LOG.info("Acquired lock for participant id: {}", participantId); hasLock.set(true); if (zkLockListener != null) { zkLockListener.onAcquiringLock(); + } else { + throw new SamzaException("LockListener unassigned."); } } else { String predecessor = children.get(index - 1); @@ -109,13 +118,13 @@ public void lock() { /** - * Delete the node created to acquire the lock + * Delete the ephemeral sequential node created to acquire the lock. */ @Override public void unlock() { if (nodePath != null) { Boolean status = false; - while (!status && nodePath != null) { + while (!status) { status = zkUtils.getZkClient().delete(nodePath); } hasLock.set(false); diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index af77d5bba7..e8eeb65611 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -496,7 +496,7 @@ class KafkaSystemAdmin( class KafkaChangelogException(s: String, t: Throwable) extends SamzaException(s, t) { def this(s: String) = this(s, null) } - + override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = { val topicMeta = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName)) val spec = new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps) @@ -519,6 +519,16 @@ class KafkaSystemAdmin( validateStream(new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions)) } + /** + * Checks for the existence of a stream defined by its spec, in Kafka. + * @param spec The spec, or blueprint from which the physical stream is created on the system. + */ + override def existStream(spec: StreamSpec): Boolean = { + val topicName = spec.getPhysicalName() + val zkClient = connectZk() + return AdminUtils.topicExists(zkClient, topicName) + } + /** * Compare the two offsets. Returns x where x < 0 if offset1 < offset2; * x == 0 if offset1 == offset2; x > 0 if offset1 > offset2. From b9761874aebb454fb7056f9891963d13310309f9 Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Wed, 9 Aug 2017 12:29:41 -0700 Subject: [PATCH 06/11] Fix retry --- .../src/main/java/org/apache/samza/coordinator/Lock.java | 2 +- .../org/apache/samza/runtime/LocalApplicationRunner.java | 5 ++--- samza-core/src/main/java/org/apache/samza/zk/ZkLock.java | 4 +++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java b/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java index 27e8437775..344de9838e 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java @@ -24,7 +24,7 @@ public interface Lock { /** * Acquires the lock */ - void lock(); + boolean lock(); /** * Releases the lock diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 8dfb506551..d7d19f079b 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -279,8 +279,7 @@ private void createStreamsWithLock(List intStreams) throws Exception initLock.setLockListener(lockListener); boolean hasLock = false; while (!hasLock) { - initLock.lock(); - hasLock = initLock.hasLock(); + hasLock = initLock.lock(); } } else { // each application process will try creating the streams, which @@ -322,7 +321,7 @@ private LockListener createZkLockListener(List intStreams, Lock init @Override public void onAcquiringLock() { try { - createStreams(intStreams); + createStream(intStreams); LOG.info("Created intermediate streams successfully!"); } catch (Exception e) { onError(); diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java index 40567de457..5b09286a54 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java @@ -62,7 +62,7 @@ public ZkLock(String participantId, ZkUtils zkUtils) { * If the path of this node has the lowest sequence number, the processor has acquired the lock. */ @Override - public void lock() { + public boolean lock() { try { nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); } catch (Exception e) { @@ -82,6 +82,7 @@ public void lock() { hasLock.set(true); if (zkLockListener != null) { zkLockListener.onAcquiringLock(); + return true; } else { throw new SamzaException("LockListener unassigned."); } @@ -113,6 +114,7 @@ public void lock() { LOG.info("Predecessor doesn't exist anymore. Trying to acquire lock again..."); lock(); } + return false; } } From d477a7ff66986f7d2e1279890d87f2b9abad9c8b Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Wed, 9 Aug 2017 17:25:31 -0700 Subject: [PATCH 07/11] Add lock timeout --- .../samza/runtime/LocalApplicationRunner.java | 12 +-- .../apache/samza/zk/ZkCoordinationUtils.java | 2 +- .../main/java/org/apache/samza/zk/ZkLock.java | 102 ++++++------------ 3 files changed, 41 insertions(+), 75 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index d7d19f079b..3890428d1c 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -164,7 +164,6 @@ public void run(StreamApplication app) { writePlanJsonFile(plan.getPlanAsJson()); // 2. create the necessary streams -// createStreams(plan.getIntermediateStreams()); createStreamsWithLock(plan.getIntermediateStreams()); // 3. create the StreamProcessors @@ -253,7 +252,7 @@ public void waitForFinish() { } - private void createStream(List intStreams) throws Exception { + private void createIntermediateStreams(List intStreams) throws Exception { boolean streamsExist = getStreamManager().checkIfStreamsExist(intStreams); if (!streamsExist) { getStreamManager().createStreams(intStreams); @@ -277,9 +276,10 @@ private void createStreamsWithLock(List intStreams) throws Exception lockListener = createZkLockListener(intStreams, initLock); } initLock.setLockListener(lockListener); - boolean hasLock = false; - while (!hasLock) { - hasLock = initLock.lock(); + boolean streamsExist = getStreamManager().checkIfStreamsExist(intStreams); + if (!streamsExist) { + initLock.lock(); + lockListener.onAcquiringLock(); } } else { // each application process will try creating the streams, which @@ -321,7 +321,7 @@ private LockListener createZkLockListener(List intStreams, Lock init @Override public void onAcquiringLock() { try { - createStream(intStreams); + createIntermediateStreams(intStreams); LOG.info("Created intermediate streams successfully!"); } catch (Exception e) { onError(); diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java index aa366d7dea..5cfc3f31e9 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java @@ -63,7 +63,7 @@ public Latch getLatch(int size, String latchId) { @Override public Lock getLock() { - return new ZkLock(processorIdStr, zkUtils); + return new ZkLock(processorIdStr, zkUtils, zkConfig); } // TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java index 5b09286a54..2714955ff2 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java @@ -18,11 +18,15 @@ */ package org.apache.samza.zk; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.List; import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.I0Itec.zkclient.IZkDataListener; import org.apache.samza.SamzaException; +import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.Lock; import org.apache.samza.coordinator.LockListener; import org.slf4j.Logger; @@ -36,99 +40,74 @@ public class ZkLock implements Lock { public static final Logger LOG = LoggerFactory.getLogger(ZkLock.class); private final static String LOCK_PATH = "lock"; + private final ZkConfig zkConfig; private final ZkUtils zkUtils; private final String lockPath; private final String participantId; private final ZkKeyBuilder keyBuilder; - private final IZkDataListener previousProcessorChangeListener; private final Random random = new Random(); - private String currentSubscription = null; private String nodePath = null; private LockListener zkLockListener = null; private AtomicBoolean hasLock; - public ZkLock(String participantId, ZkUtils zkUtils) { + public ZkLock(String participantId, ZkUtils zkUtils, ZkConfig zkConfig) { + this.zkConfig = zkConfig; this.zkUtils = zkUtils; this.participantId = participantId; this.keyBuilder = this.zkUtils.getKeyBuilder(); - this.previousProcessorChangeListener = new PreviousLockProcessorChangeListener(); lockPath = String.format("%s/%s", keyBuilder.getRootPath(), LOCK_PATH); zkUtils.makeSurePersistentPathsExists(new String[] {lockPath}); this.hasLock = new AtomicBoolean(false); } /** - * Create a sequential ephemeral node to acquire the lock. - * If the path of this node has the lowest sequence number, the processor has acquired the lock. + * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out. + * Creates a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. */ @Override public boolean lock() { - try { - nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); - } catch (Exception e) { - if (zkLockListener != null) { - zkLockListener.onError(); - } - } - List children = zkUtils.getZkClient().getChildren(lockPath); - int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); + nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); - if (children.size() == 0 || index == -1) { - throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!"); - } - - if (index == 0) { - LOG.info("Acquired lock for participant id: {}", participantId); - hasLock.set(true); - if (zkLockListener != null) { - zkLockListener.onAcquiringLock(); - return true; - } else { - throw new SamzaException("LockListener unassigned."); - } - } else { - String predecessor = children.get(index - 1); - if (!predecessor.equals(currentSubscription)) { - if (currentSubscription != null) { - zkUtils.unsubscribeDataChanges(lockPath + "/" + currentSubscription, - previousProcessorChangeListener); + //Start timer for timeout + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("ZkLock-%d").build()); + scheduler.schedule(() -> { + if (!hasLock.get()) { + throw new SamzaException("Timed out while acquiring lock for creating intermediate streams."); } - currentSubscription = predecessor; - zkUtils.subscribeDataChanges(lockPath + "/" + currentSubscription, - previousProcessorChangeListener); + }, zkConfig.getZkSessionTimeoutMs(), TimeUnit.MILLISECONDS); + + while (!hasLock.get()) { + List children = zkUtils.getZkClient().getChildren(lockPath); + int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); + + if (children.size() == 0 || index == -1) { + throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!"); } - /** - * Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes - * on the path, even if the path doesn't exist. Since we are using Ephemeral Sequential nodes, if the path doesn't - * exist during subscription, it is not going to get created in the future. - */ - boolean predecessorExists = zkUtils.exists(lockPath + "/" + currentSubscription); - if (predecessorExists) { - LOG.info("Predecessor still exists. Current subscription is valid. Continuing as non-lockholder."); + // Acquires lock when the node has the lowest sequence number and returns. + if (index == 0) { + hasLock.set(true); + LOG.info("Acquired lock for participant id: {}", participantId); + return true; } else { + // Keep trying to acquire the lock try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { Thread.interrupted(); } - LOG.info("Predecessor doesn't exist anymore. Trying to acquire lock again..."); - lock(); + LOG.info("Trying to acquire lock again..."); } - return false; } + return hasLock.get(); } - /** - * Delete the ephemeral sequential node created to acquire the lock. + * Unlocks, by deleting the ephemeral sequential node created to acquire the lock. */ @Override public void unlock() { if (nodePath != null) { - Boolean status = false; - while (!status) { - status = zkUtils.getZkClient().delete(nodePath); - } + zkUtils.getZkClient().delete(nodePath); hasLock.set(false); nodePath = null; LOG.info("Ephemeral lock node deleted. Unlocked!"); @@ -147,17 +126,4 @@ public void setLockListener(LockListener listener) { this.zkLockListener = listener; } - class PreviousLockProcessorChangeListener implements IZkDataListener { - @Override - public void handleDataChange(String dataPath, Object data) throws Exception { - LOG.debug("Data change on path: " + dataPath + " Data: " + data); - } - - @Override - public void handleDataDeleted(String dataPath) throws Exception { - LOG.info("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again..."); - lock(); - } - } - } \ No newline at end of file From 97c149b0566bad03e84f6536a06e5289536ddff2 Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Fri, 11 Aug 2017 17:02:18 -0700 Subject: [PATCH 08/11] Change lock API --- .../org/apache/samza/system/SystemAdmin.java | 11 +-- .../samza/coordinator/CoordinationUtils.java | 2 +- .../{Lock.java => DistributedLock.java} | 17 ++-- .../samza/coordinator/LockListener.java | 30 ------ .../apache/samza/execution/StreamManager.java | 17 +--- .../samza/runtime/LocalApplicationRunner.java | 95 +++---------------- .../apache/samza/zk/ZkCoordinationUtils.java | 6 +- .../{ZkLock.java => ZkDistributedLock.java} | 57 ++++------- .../java/org/apache/samza/zk/TestZkLock.java | 23 ----- .../samza/system/kafka/KafkaSystemAdmin.scala | 10 -- 10 files changed, 48 insertions(+), 220 deletions(-) rename samza-core/src/main/java/org/apache/samza/coordinator/{Lock.java => DistributedLock.java} (73%) delete mode 100644 samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java rename samza-core/src/main/java/org/apache/samza/zk/{ZkLock.java => ZkDistributedLock.java} (67%) delete mode 100644 samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index 983c4648d9..27eef5fb34 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -114,13 +114,4 @@ default boolean createStream(StreamSpec streamSpec) { default void validateStream(StreamSpec streamSpec) throws StreamValidationException { throw new UnsupportedOperationException(); } - - /** - * Check if the stream described by the spec already exists. - * @param streamSpec The spec, or blueprint for the physical stream on the system. - * @return true if stream exists already, false otherwise - */ - default boolean existStream(StreamSpec streamSpec) { - return false; - } -} +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java index 32e4f6c4bd..23afe4e92a 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java @@ -42,5 +42,5 @@ public interface CoordinationUtils { Latch getLatch(int size, String latchId); - Lock getLock(); + DistributedLock getLock(); } diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java similarity index 73% rename from samza-core/src/main/java/org/apache/samza/coordinator/Lock.java rename to samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java index 344de9838e..6972cd95ba 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/Lock.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java @@ -19,20 +19,21 @@ package org.apache.samza.coordinator; -public interface Lock { +import java.util.concurrent.TimeUnit; + + +public interface DistributedLock { /** - * Acquires the lock + * Tries to acquire the lock + * @param timeout Duration of lock acquiring timeout. + * @param unit Time Unit of the timeout defined above. + * @return true if lock is acquired successfully, false if it times out. */ - boolean lock(); + boolean lock(long timeout, TimeUnit unit); /** * Releases the lock */ void unlock(); - - boolean hasLock(); - - void setLockListener(LockListener listener); - } \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java b/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java deleted file mode 100644 index f80da793e6..0000000000 --- a/samza-core/src/main/java/org/apache/samza/coordinator/LockListener.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.coordinator; - -public interface LockListener { - - public void onAcquiringLock(); - - /** - * Perform the necessary operation when a notification about acquire lock/release lock/error has been received. - */ - public void onError(); - -} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java index c2eb78a7e6..c290d31b2f 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java @@ -75,19 +75,4 @@ Map getStreamPartitionCounts(String systemName, Set str return streamToPartitionCount; } - - /** - * Check if the streams described by the specs already exist. - * @param streams A list of stream specs, whose existence we need to check for - * @return true if all the streams exist already, false otherwise - */ - public boolean checkIfStreamsExist(List streams) { - for (StreamSpec spec: streams) { - SystemAdmin systemAdmin = sysAdmins.get(spec.getSystemName()); - if (!systemAdmin.existStream(spec)) { - return false; - } - } - return true; - } -} +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 3890428d1c..9fed19d1ea 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -36,10 +36,7 @@ import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.coordinator.CoordinationUtils; -import org.apache.samza.coordinator.Latch; -import org.apache.samza.coordinator.LeaderElector; -import org.apache.samza.coordinator.Lock; -import org.apache.samza.coordinator.LockListener; +import org.apache.samza.coordinator.DistributedLock; import org.apache.samza.execution.ExecutionPlan; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.processor.StreamProcessor; @@ -59,10 +56,8 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class); - // Latch id that's used for awaiting the init of application before creating the StreamProcessors - private static final String INIT_LATCH_ID = "init"; - // Latch timeout is set to 10 min - private static final long LATCH_TIMEOUT_MINUTES = 10; + // Lock timeout is set to 10 seconds here, as we don't want to introduce a new config value currently. + private static final long LOCK_TIMEOUT_SECONDS = 10; private final String uid; private final CoordinationUtils coordinationUtils; @@ -164,7 +159,7 @@ public void run(StreamApplication app) { writePlanJsonFile(plan.getPlanAsJson()); // 2. create the necessary streams - createStreamsWithLock(plan.getIntermediateStreams()); + createStreams(plan.getIntermediateStreams()); // 3. create the StreamProcessors if (plan.getJobConfigs().isEmpty()) { @@ -224,62 +219,26 @@ public void waitForFinish() { } } - /** - * Create intermediate streams using {@link org.apache.samza.execution.StreamManager}. - * If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader - * will create the streams. All the runner processes will wait on the latch that is released after the leader finishes - * stream creation. - * @param intStreams list of intermediate {@link StreamSpec}s - * @throws Exception exception for latch timeout - */ - /* package private */ void createStreams(List intStreams) throws Exception { - if (!intStreams.isEmpty()) { - if (coordinationUtils != null) { - Latch initLatch = coordinationUtils.getLatch(1, INIT_LATCH_ID); - LeaderElector leaderElector = coordinationUtils.getLeaderElector(); - leaderElector.setLeaderElectorListener(() -> { - getStreamManager().createStreams(intStreams); - initLatch.countDown(); - }); - leaderElector.tryBecomeLeader(); - initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES); - } else { - // each application process will try creating the streams, which - // requires stream creation to be idempotent - getStreamManager().createStreams(intStreams); - } - } - } - - - private void createIntermediateStreams(List intStreams) throws Exception { - boolean streamsExist = getStreamManager().checkIfStreamsExist(intStreams); - if (!streamsExist) { - getStreamManager().createStreams(intStreams); - } - } /** * Create intermediate streams using {@link org.apache.samza.execution.StreamManager}. * If {@link CoordinationUtils} is provided, this function will first acquire a lock, and then create the streams. - * All the runner processes will wait till the time they acquire the lock. On acquiring lock, they will check if the streams have already been created. - * If streams exist already, they will unlock and proceed normally. If streams don't exist, they create the streams and then unlock. + * All the runner processes will either wait till the time they acquire the lock, or timeout after the specified time. + * After stream creation, they will unlock and proceed normally. * @param intStreams list of intermediate {@link StreamSpec}s * @throws Exception exception for latch timeout */ - private void createStreamsWithLock(List intStreams) throws Exception { + private void createStreams(List intStreams) throws Exception { if (!intStreams.isEmpty()) { if (coordinationUtils != null) { - Lock initLock = coordinationUtils.getLock(); - LockListener lockListener = null; - if (coordinationUtils.getClass().getName().equals("org.apache.samza.zk.ZkCoordinationUtils")) { - lockListener = createZkLockListener(intStreams, initLock); - } - initLock.setLockListener(lockListener); - boolean streamsExist = getStreamManager().checkIfStreamsExist(intStreams); - if (!streamsExist) { - initLock.lock(); - lockListener.onAcquiringLock(); + DistributedLock initLock = coordinationUtils.getLock(); + boolean hasLock = initLock.lock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (hasLock) { + getStreamManager().createStreams(intStreams); + } else { + LOG.error("Timed out while trying to acquire lock"); + coordinationUtils.reset(); + throw new SamzaException(); } } else { // each application process will try creating the streams, which @@ -312,28 +271,4 @@ StreamProcessor createStreamProcessor( taskFactory.getClass().getCanonicalName())); } } - - private LockListener createZkLockListener(List intStreams, Lock initLock) { - return new LockListener() { - /** - * When the lock is acquired, create the intermediate streams. - */ - @Override - public void onAcquiringLock() { - try { - createIntermediateStreams(intStreams); - LOG.info("Created intermediate streams successfully!"); - } catch (Exception e) { - onError(); - } - initLock.unlock(); - } - - @Override - public void onError() { - LOG.info("Error while creating streams! Trying again."); - } - }; - } - } \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java index 5cfc3f31e9..b84d2a4626 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java @@ -23,7 +23,7 @@ import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.Latch; import org.apache.samza.coordinator.LeaderElector; -import org.apache.samza.coordinator.Lock; +import org.apache.samza.coordinator.DistributedLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,8 +62,8 @@ public Latch getLatch(int size, String latchId) { } @Override - public Lock getLock() { - return new ZkLock(processorIdStr, zkUtils, zkConfig); + public DistributedLock getLock() { + return new ZkDistributedLock(processorIdStr, zkUtils, zkConfig); } // TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java similarity index 67% rename from samza-core/src/main/java/org/apache/samza/zk/ZkLock.java rename to samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java index 2714955ff2..4b7b138392 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLock.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java @@ -18,65 +18,54 @@ */ package org.apache.samza.zk; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.List; import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.samza.SamzaException; import org.apache.samza.config.ZkConfig; -import org.apache.samza.coordinator.Lock; -import org.apache.samza.coordinator.LockListener; +import org.apache.samza.coordinator.DistributedLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Lock primitive for Zookeeper. + * Distributed lock primitive for Zookeeper. */ -public class ZkLock implements Lock { +public class ZkDistributedLock implements DistributedLock { - public static final Logger LOG = LoggerFactory.getLogger(ZkLock.class); - private final static String LOCK_PATH = "lock"; - private final ZkConfig zkConfig; + public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedLock.class); + private final static String LOCK_PATH = "distributed_lock"; private final ZkUtils zkUtils; private final String lockPath; private final String participantId; private final ZkKeyBuilder keyBuilder; private final Random random = new Random(); private String nodePath = null; - private LockListener zkLockListener = null; - private AtomicBoolean hasLock; - public ZkLock(String participantId, ZkUtils zkUtils, ZkConfig zkConfig) { - this.zkConfig = zkConfig; + public ZkDistributedLock(String participantId, ZkUtils zkUtils, ZkConfig zkConfig) { this.zkUtils = zkUtils; this.participantId = participantId; this.keyBuilder = this.zkUtils.getKeyBuilder(); lockPath = String.format("%s/%s", keyBuilder.getRootPath(), LOCK_PATH); zkUtils.makeSurePersistentPathsExists(new String[] {lockPath}); - this.hasLock = new AtomicBoolean(false); } /** * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out. * Creates a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. + * @param timeout Duration of lock acquiring timeout. + * @param unit Unit of the timeout defined above. + * @return true if lock is acquired successfully, false if it times out. */ @Override - public boolean lock() { + public boolean lock(long timeout, TimeUnit unit) { nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); //Start timer for timeout - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("ZkLock-%d").build()); - scheduler.schedule(() -> { - if (!hasLock.get()) { - throw new SamzaException("Timed out while acquiring lock for creating intermediate streams."); - } - }, zkConfig.getZkSessionTimeoutMs(), TimeUnit.MILLISECONDS); + long startTime = System.currentTimeMillis(); + long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit); - while (!hasLock.get()) { + while (true) { List children = zkUtils.getZkClient().getChildren(lockPath); int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); @@ -85,11 +74,14 @@ public boolean lock() { } // Acquires lock when the node has the lowest sequence number and returns. if (index == 0) { - hasLock.set(true); LOG.info("Acquired lock for participant id: {}", participantId); return true; } else { // Keep trying to acquire the lock + long currentTime = System.currentTimeMillis(); + if ((currentTime - startTime) >= lockTimeout) { + return false; + } try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { @@ -98,7 +90,6 @@ public boolean lock() { LOG.info("Trying to acquire lock again..."); } } - return hasLock.get(); } /** @@ -108,22 +99,10 @@ public boolean lock() { public void unlock() { if (nodePath != null) { zkUtils.getZkClient().delete(nodePath); - hasLock.set(false); nodePath = null; LOG.info("Ephemeral lock node deleted. Unlocked!"); } else { - LOG.info("Ephemeral lock node doesn't exist"); + LOG.error("Ephemeral lock node you want to delete doesn't exist"); } } - - @Override - public boolean hasLock() { - return hasLock.get(); - } - - @Override - public void setLockListener(LockListener listener) { - this.zkLockListener = listener; - } - } \ No newline at end of file diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java deleted file mode 100644 index 50d55702a6..0000000000 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLock.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.zk; - -public class TestZkLock { -} diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index e8eeb65611..983c8d95b5 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -519,16 +519,6 @@ class KafkaSystemAdmin( validateStream(new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions)) } - /** - * Checks for the existence of a stream defined by its spec, in Kafka. - * @param spec The spec, or blueprint from which the physical stream is created on the system. - */ - override def existStream(spec: StreamSpec): Boolean = { - val topicName = spec.getPhysicalName() - val zkClient = connectZk() - return AdminUtils.topicExists(zkClient, topicName) - } - /** * Compare the two offsets. Returns x where x < 0 if offset1 < offset2; * x == 0 if offset1 == offset2; x > 0 if offset1 > offset2. From 3587681c0b41c2a1894c825a13b4cd2953889228 Mon Sep 17 00:00:00 2001 From: PawasChhokra Date: Fri, 11 Aug 2017 18:50:17 -0700 Subject: [PATCH 09/11] Address review --- .../org/apache/samza/system/SystemAdmin.java | 2 +- .../samza/coordinator/CoordinationUtils.java | 2 +- .../apache/samza/execution/StreamManager.java | 2 +- .../samza/runtime/LocalApplicationRunner.java | 33 ++++++++++++++----- .../apache/samza/zk/ZkCoordinationUtils.java | 4 +-- .../apache/samza/zk/ZkDistributedLock.java | 16 +++------ 6 files changed, 34 insertions(+), 25 deletions(-) diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index 27eef5fb34..b18071243d 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -114,4 +114,4 @@ default boolean createStream(StreamSpec streamSpec) { default void validateStream(StreamSpec streamSpec) throws StreamValidationException { throw new UnsupportedOperationException(); } -} \ No newline at end of file +} diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java index 23afe4e92a..1e4acd79d0 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java @@ -42,5 +42,5 @@ public interface CoordinationUtils { Latch getLatch(int size, String latchId); - DistributedLock getLock(); + DistributedLock getLock(String initLockPath); } diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java index c290d31b2f..c6ab036228 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java @@ -75,4 +75,4 @@ Map getStreamPartitionCounts(String systemName, Set str return streamToPartitionCount; } -} \ No newline at end of file +} diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 9fed19d1ea..48cc460d05 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -21,13 +21,16 @@ import java.util.HashMap; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; @@ -219,6 +222,17 @@ public void waitForFinish() { } } + /** + * Generates a unique lock ID which is consistent for all processors within the same application lifecycle. + * Each {@code StreamSpec} has an ID that is unique and is used for hashcode computation. + * @param intStreams list of {@link StreamSpec}s + * @return lock ID + */ + private String generateLockId(List intStreams) { + return String.valueOf(Objects.hashCode(intStreams.stream() + .map(StreamSpec::getId) + .collect(Collectors.toList()))); + } /** * Create intermediate streams using {@link org.apache.samza.execution.StreamManager}. @@ -226,19 +240,20 @@ public void waitForFinish() { * All the runner processes will either wait till the time they acquire the lock, or timeout after the specified time. * After stream creation, they will unlock and proceed normally. * @param intStreams list of intermediate {@link StreamSpec}s - * @throws Exception exception for latch timeout */ - private void createStreams(List intStreams) throws Exception { + private void createStreams(List intStreams) { if (!intStreams.isEmpty()) { if (coordinationUtils != null) { - DistributedLock initLock = coordinationUtils.getLock(); - boolean hasLock = initLock.lock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS); - if (hasLock) { - getStreamManager().createStreams(intStreams); - } else { - LOG.error("Timed out while trying to acquire lock"); + DistributedLock initLock = coordinationUtils.getLock(generateLockId(intStreams)); + try { + boolean hasLock = initLock.lock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (hasLock) { + getStreamManager().createStreams(intStreams); + } else { + LOG.error("Timed out while trying to acquire lock.", new TimeoutException()); + } + } finally { coordinationUtils.reset(); - throw new SamzaException(); } } else { // each application process will try creating the streams, which diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java index b84d2a4626..3fd7b75a60 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java @@ -62,8 +62,8 @@ public Latch getLatch(int size, String latchId) { } @Override - public DistributedLock getLock() { - return new ZkDistributedLock(processorIdStr, zkUtils, zkConfig); + public DistributedLock getLock(String initLockPath) { + return new ZkDistributedLock(processorIdStr, zkUtils, initLockPath); } // TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java index 4b7b138392..71723739c6 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java @@ -22,7 +22,6 @@ import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.samza.SamzaException; -import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.DistributedLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +33,6 @@ public class ZkDistributedLock implements DistributedLock { public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedLock.class); - private final static String LOCK_PATH = "distributed_lock"; private final ZkUtils zkUtils; private final String lockPath; private final String participantId; @@ -42,11 +40,11 @@ public class ZkDistributedLock implements DistributedLock { private final Random random = new Random(); private String nodePath = null; - public ZkDistributedLock(String participantId, ZkUtils zkUtils, ZkConfig zkConfig) { + public ZkDistributedLock(String participantId, ZkUtils zkUtils, String initLockPath) { this.zkUtils = zkUtils; this.participantId = participantId; this.keyBuilder = this.zkUtils.getKeyBuilder(); - lockPath = String.format("%s/%s", keyBuilder.getRootPath(), LOCK_PATH); + lockPath = String.format("%s/%s", keyBuilder.getRootPath(), initLockPath); zkUtils.makeSurePersistentPathsExists(new String[] {lockPath}); } @@ -65,7 +63,7 @@ public boolean lock(long timeout, TimeUnit unit) { long startTime = System.currentTimeMillis(); long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit); - while (true) { + while ((System.currentTimeMillis() - startTime) < lockTimeout) { List children = zkUtils.getZkClient().getChildren(lockPath); int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); @@ -77,11 +75,6 @@ public boolean lock(long timeout, TimeUnit unit) { LOG.info("Acquired lock for participant id: {}", participantId); return true; } else { - // Keep trying to acquire the lock - long currentTime = System.currentTimeMillis(); - if ((currentTime - startTime) >= lockTimeout) { - return false; - } try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { @@ -90,6 +83,7 @@ public boolean lock(long timeout, TimeUnit unit) { LOG.info("Trying to acquire lock again..."); } } + return false; } /** @@ -102,7 +96,7 @@ public void unlock() { nodePath = null; LOG.info("Ephemeral lock node deleted. Unlocked!"); } else { - LOG.error("Ephemeral lock node you want to delete doesn't exist"); + LOG.warn("Ephemeral lock node you want to delete doesn't exist"); } } } \ No newline at end of file From befdc0b330eac198d86f0c6b0cc6b343ebf4cfde Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Mon, 14 Aug 2017 15:48:21 -0700 Subject: [PATCH 10/11] smlall adjustments --- .../samza/coordinator/CoordinationUtils.java | 34 +++++++++---------- .../samza/runtime/LocalApplicationRunner.java | 4 ++- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java index 494efab2fe..85bfdc0843 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java @@ -1,21 +1,21 @@ /* - - * Licensed to the Apache Software Foundation (ASF) under one - - * or more contributor license agreements. See the NOTICE file - - * distributed with this work for additional information - - * regarding copyright ownership. The ASF licenses this file - - * to you under the Apache License, Version 2.0 (the - - * "License"); you may not use this file except in compliance - - * with the License. You may obtain a copy of the License at - - * - - * http://www.apache.org/licenses/LICENSE-2.0 - - * - - * Unless required by applicable law or agreed to in writing, - - * software distributed under the License is distributed on an - - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - - * KIND, either express or implied. See the License for the - - * specific language governing permissions and limitations - - * under the License. - - */ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.samza.coordinator; import org.apache.samza.SamzaException; diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index efedf08b46..0b113b601c 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -33,6 +33,7 @@ import java.util.stream.Collectors; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.TaskConfig; @@ -224,7 +225,8 @@ private void createStreams(List intStreams) { DistributedLock initLock = null; try { - CoordinationUtils coordinationUtils = CoordinationUtils.getCoordinationUtils("APP_ID", uid, config); + CoordinationUtils coordinationUtils = CoordinationUtils.getCoordinationUtils( + new ApplicationConfig(config).getGlobalAppId(), uid, config); initLock = coordinationUtils.getLock(generateLockId(intStreams)); } catch (SamzaException e) { LOG.warn("Coordination utils are not available.", e); From 2852cad70da82a6c510d1a69af5559e8ae9e3772 Mon Sep 17 00:00:00 2001 From: Boris Shkolnik Date: Mon, 14 Aug 2017 16:02:24 -0700 Subject: [PATCH 11/11] cleanup --- .../java/org/apache/samza/coordinator/LeaderElector.java | 2 -- .../main/java/org/apache/samza/zk/ZkLeaderElector.java | 6 ------ .../main/java/org/apache/samza/zk/ZkProcessorLatch.java | 6 ------ .../main/scala/org/apache/samza/coordinator/Latch.java | 2 -- .../java/org/apache/samza/zk/TestZkLeaderElector.java | 3 +-- .../test/java/org/apache/samza/zk/TestZkNamespace.java | 8 ++++---- .../java/org/apache/samza/zk/TestZkProcessorLatch.java | 2 +- 7 files changed, 6 insertions(+), 23 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java b/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java index 5283a7b97a..c624f835d5 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java @@ -56,6 +56,4 @@ public interface LeaderElector { * @return True, if the caller is the current leader. False, otherwise */ boolean amILeader(); - - void close(); } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java index b182214ef7..97430cbde8 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java @@ -169,12 +169,6 @@ private String zLog(String logMessage) { return String.format("[Processor-%s] %s", processorIdStr, logMessage); } - @Override - public void close() { - if (zkUtils != null) - zkUtils.close(); - } - // Only by non-leaders class PreviousProcessorChangeListener extends ZkUtils.GenIZkDataListener { diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java index e295a6c22a..decdd7d6ee 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java @@ -63,10 +63,4 @@ public void countDown() { String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId); LOGGER.debug("ZKProcessorLatch countDown created " + path); } - - @Override - public void close() { - if (zkUtils != null) - zkUtils.close(); - } } diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/Latch.java b/samza-core/src/main/scala/org/apache/samza/coordinator/Latch.java index 4fd4e1e4f5..5ca91380f9 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/Latch.java +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/Latch.java @@ -30,6 +30,4 @@ public interface Latch { void await(long timeout, TimeUnit tu) throws TimeoutException; void countDown(); - - void close(); } diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java index c5b99a871e..3ff91757d4 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -433,8 +433,7 @@ public void testAmILeader() { } private ZkUtils getZkUtilsWithNewClient() { - ZkClient zkClient = ZkCoordinationUtils - .createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS); + ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS); return new ZkUtils( KEY_BUILDER, zkClient, diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java index 459c71602a..3ce203e1b7 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java @@ -90,7 +90,7 @@ private void testDoNotFailIfNameSpacePresent(String zkNameSpace) { String zkConnect = "127.0.0.1:" + zkServer.getPort() + zkNameSpace; createNamespace(zkNameSpace); initZk(zkConnect); - ZkCoordinationUtils.validateZkNameSpace(zkConnect, zkClient); + ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient); zkClient.createPersistent("/test"); zkClient.createPersistent("/test/test1"); @@ -106,7 +106,7 @@ public void testValidateFailZkNameSpace1LevelPath() { try { String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/zkNameSpace"; initZk(zkConnect); - ZkCoordinationUtils.validateZkNameSpace(zkConnect, zkClient); + ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient); Assert.fail("1.Should fail with exception, because namespace doesn't exist"); } catch (SamzaException e) { // expected @@ -120,7 +120,7 @@ public void testValidateFailZkNameSpace2LevelPath() { try { String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/zkNameSpace/xyz"; initZk(zkConnect); - ZkCoordinationUtils.validateZkNameSpace(zkConnect, zkClient); + ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient); Assert.fail("2.Should fail with exception, because namespace doesn't exist"); } catch (SamzaException e) { // expected @@ -134,7 +134,7 @@ public void testValidateFailZkNameSpaceEmptyPath() { // should succeed, because no namespace provided String zkConnect = "127.0.0.1:" + zkServer.getPort() + ""; initZk(zkConnect); - ZkCoordinationUtils.validateZkNameSpace(zkConnect, zkClient); + ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient); tearDownZk(); } diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java index 5728b07ead..b2a5533213 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java @@ -215,7 +215,7 @@ public void testLatchExpires() { } private ZkUtils getZkUtilsWithNewClient(String processorId) { - ZkClient zkClient = ZkCoordinationUtils + ZkClient zkClient = ZkCoordinationServiceFactory .createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS); return new ZkUtils( KEY_BUILDER,