diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index b0bfc8a62d..588e657f0d 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.samza.SamzaException; @@ -57,13 +58,12 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class); - // Latch id that's used for awaiting the init of application before creating the StreamProcessors - private static final String INIT_LATCH_ID = "init"; + private static final String APPLICATION_RUNNER_ZK_PATH_SUFFIX = "/ApplicationRunnerData"; // Latch timeout is set to 10 min private static final long LATCH_TIMEOUT_MINUTES = 10; + private static final long LEADER_ELECTION_WAIT_TIME_MS = 1000; private final String uid; - private final CoordinationUtils coordinationUtils; private final Set processors = ConcurrentHashMap.newKeySet(); private final CountDownLatch shutdownLatch = new CountDownLatch(1); private final AtomicInteger numProcessorsToStart = new AtomicInteger(); @@ -122,9 +122,6 @@ private void shutdownAndNotify() { } } - if (coordinationUtils != null) { - coordinationUtils.reset(); - } shutdownLatch.countDown(); } } @@ -132,7 +129,6 @@ private void shutdownAndNotify() { public LocalApplicationRunner(Config config) { super(config); uid = UUID.randomUUID().toString(); - coordinationUtils = createCoordinationUtils(); } @Override @@ -159,10 +155,14 @@ public void run(StreamApplication app) { try { // 1. initialize and plan ExecutionPlan plan = getExecutionPlan(app); - writePlanJsonFile(plan.getPlanAsJson()); + + String executionPlanJson = plan.getPlanAsJson(); + writePlanJsonFile(executionPlanJson); // 2. create the necessary streams - createStreams(plan.getIntermediateStreams()); + // TODO: System generated intermediate streams should have robust naming scheme. Refer JIRA-1391 + String planId = String.valueOf(executionPlanJson.hashCode()); + createStreams(planId, plan.getIntermediateStreams()); // 3. create the StreamProcessors if (plan.getJobConfigs().isEmpty()) { @@ -216,7 +216,8 @@ public void waitForFinish() { // TODO: we will need a better way to package the configs with application runner if (ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) { ApplicationConfig appConfig = new ApplicationConfig(config); - return new ZkCoordinationServiceFactory().getCoordinationService(appConfig.getGlobalAppId(), uid, config); + return new ZkCoordinationServiceFactory().getCoordinationService( + appConfig.getGlobalAppId() + APPLICATION_RUNNER_ZK_PATH_SUFFIX, uid, config); } else { return null; } @@ -227,20 +228,32 @@ public void waitForFinish() { * If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader * will create the streams. All the runner processes will wait on the latch that is released after the leader finishes * stream creation. + * @param planId a unique identifier representing the plan used for coordination purpose * @param intStreams list of intermediate {@link StreamSpec}s - * @throws Exception exception for latch timeout + * @throws TimeoutException exception for latch timeout */ - /* package private */ void createStreams(List intStreams) throws Exception { + /* package private */ void createStreams(String planId, List intStreams) throws TimeoutException { if (!intStreams.isEmpty()) { + // Move the scope of coordination utils within stream creation to address long idle connection problem. + // Refer SAMZA-1385 for more details + CoordinationUtils coordinationUtils = createCoordinationUtils(); if (coordinationUtils != null) { - Latch initLatch = coordinationUtils.getLatch(1, INIT_LATCH_ID); - LeaderElector leaderElector = coordinationUtils.getLeaderElector(); - leaderElector.setLeaderElectorListener(() -> { - getStreamManager().createStreams(intStreams); - initLatch.countDown(); - }); - leaderElector.tryBecomeLeader(); - initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES); + Latch initLatch = coordinationUtils.getLatch(1, planId); + + try { + // check if the processor needs to go through leader election and stream creation + if (shouldContestInElectionForStreamCreation(initLatch)) { + LeaderElector leaderElector = coordinationUtils.getLeaderElector(); + leaderElector.setLeaderElectorListener(() -> { + getStreamManager().createStreams(intStreams); + initLatch.countDown(); + }); + leaderElector.tryBecomeLeader(); + initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES); + } + } finally { + coordinationUtils.reset(); + } } else { // each application process will try creating the streams, which // requires stream creation to be idempotent @@ -272,4 +285,31 @@ StreamProcessor createStreamProcessor( taskFactory.getClass().getCanonicalName())); } } + + /** + * In order to fix SAMZA-1385, we are limiting the scope of coordination util within stream creation phase and destroying + * the coordination util right after. By closing the zk connection, we clean up the ephemeral node used for leader election. + * It creates the following issues whenever a new process joins after the ephemeral node is gone. + * 1. It is unnecessary to re-conduct leader election for stream creation in the same application lifecycle + * 2. Underlying systems may not support check for stream existence prior to creation which could have potential problems. + * As in interim solution, we reuse the same latch as a marker to determine if create streams phase is done for the + * application lifecycle using {@link Latch#await(long, TimeUnit)} + * + * @param streamCreationLatch latch used for stream creation + * @return true if processor needs to be part of election + * false otherwise + */ + private boolean shouldContestInElectionForStreamCreation(Latch streamCreationLatch) { + boolean eligibleForElection = true; + + try { + streamCreationLatch.await(LEADER_ELECTION_WAIT_TIME_MS, TimeUnit.MILLISECONDS); + // case we didn't time out suggesting that latch already exists + eligibleForElection = false; + } catch (TimeoutException e) { + LOG.info("Timed out waiting for the latch! Going to enter leader election section to create streams"); + } + + return eligibleForElection; + } } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java index 85e3b4ab55..08d826ec60 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -22,6 +22,7 @@ import org.I0Itec.zkclient.ZkClient; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; @@ -30,9 +31,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinatorFactory.class); + private static final String JOB_COORDINATOR_ZK_PATH_FORMAT = "%s/%s-%s-coordinationData"; + private static final String DEFAULT_JOB_ID = "1"; + private static final String DEFAULT_JOB_NAME = "defaultJob"; /** * Method to instantiate an implementation of JobCoordinator @@ -50,8 +55,21 @@ public JobCoordinator getJobCoordinator(Config config) { private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry) { ZkConfig zkConfig = new ZkConfig(config); - ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId()); + ZkKeyBuilder keyBuilder = new ZkKeyBuilder(getJobCoordinationZkPath(config)); ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), metricsRegistry); } + + private String getJobCoordinationZkPath(Config config) { + JobConfig jobConfig = new JobConfig(config); + String appId = new ApplicationConfig(config).getGlobalAppId(); + String jobName = jobConfig.getName().isDefined() + ? jobConfig.getName().get() + : DEFAULT_JOB_NAME; + String jobId = jobConfig.getJobId().isDefined() + ? jobConfig.getJobId().get() + : DEFAULT_JOB_ID; + + return String.format(JOB_COORDINATOR_ZK_PATH_FORMAT, appId, jobName, jobId); + } } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java index decdd7d6ee..166c6276c2 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java @@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.samza.coordinator.Latch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +54,14 @@ public ZkProcessorLatch(int size, String latchId, String participantId, ZkUtils } @Override - public void await(long timeout, TimeUnit timeUnit) { - zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout); + public void await(long timeout, TimeUnit timeUnit) throws TimeoutException { + // waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map + // the non-existence to a TimeoutException in order to respect the contract defined in Latch interface + boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout); + + if (!targetPathExists) { + throw new TimeoutException("Timed out waiting for the targetPath"); + } } @Override diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index a04bd3b06f..6c7827e9ac 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -19,6 +19,15 @@ package org.apache.samza.runtime; +import com.google.common.collect.ImmutableList; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.JobConfig; @@ -38,31 +47,31 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; -import java.lang.reflect.Field; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class TestLocalApplicationRunner { + private static final String PLAN_JSON = "{" + + "\"jobs\":[{" + + "\"jobName\":\"test-application\"," + + "\"jobId\":\"1\"," + + "\"operatorGraph\":{" + + "\"intermediateStreams\":{%s}," + + "\"applicationName\":\"test-application\",\"applicationId\":\"1\"}"; + private static final String STREAM_SPEC_JSON_FORMAT = "\"%s\":{" + + "\"streamSpec\":{" + + "\"id\":\"%s\"," + + "\"systemName\":\"%s\"," + + "\"physicalName\":\"%s\"," + + "\"partitionCount\":2}," + + "\"sourceJobs\":[\"test-app\"]," + + "\"targetJobs\":[\"test-target-app\"]},"; + @Test public void testStreamCreation() throws Exception { Map config = new HashMap<>(); @@ -180,8 +189,10 @@ public boolean amILeader() { @Override public void await(long timeout, TimeUnit tu) throws TimeoutException { - // in this test, latch is released before wait - assertTrue(done); + // in this test, latch is released after countDown is invoked + if (!done) { + throw new TimeoutException("timed out waiting for the target path"); + } } @Override @@ -343,4 +354,80 @@ public String getPlanAsJson() assertEquals(spy.status(app), ApplicationStatus.UnsuccessfulFinish); } + /** + * A test case to verify if the plan results in different hash if there is change in topological sort order. + * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases. + */ + @Test + public void testPlanIdWithShuffledStreamSpecs() { + List streamSpecs = ImmutableList.of( + new StreamSpec("test-stream-1", "stream-1", "testStream"), + new StreamSpec("test-stream-2", "stream-2", "testStream"), + new StreamSpec("test-stream-3", "stream-3", "testStream")); + String planIdBeforeShuffle = getExecutionPlanId(streamSpecs); + + List shuffledStreamSpecs = ImmutableList.of( + new StreamSpec("test-stream-2", "stream-2", "testStream"), + new StreamSpec("test-stream-1", "stream-1", "testStream"), + new StreamSpec("test-stream-3", "stream-3", "testStream")); + + assertNotEquals("Expected both of the latch ids to be different", planIdBeforeShuffle, + getExecutionPlanId(shuffledStreamSpecs)); + } + + /** + * A test case to verify if the plan results in same hash in case of same plan. + * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases. + */ + @Test + public void testGeneratePlanIdWithSameStreamSpecs() { + List streamSpecs = ImmutableList.of( + new StreamSpec("test-stream-1", "stream-1", "testStream"), + new StreamSpec("test-stream-2", "stream-2", "testStream"), + new StreamSpec("test-stream-3", "stream-3", "testStream")); + String planIdForFirstAttempt = getExecutionPlanId(streamSpecs); + String planIdForSecondAttempt = getExecutionPlanId(streamSpecs); + + assertEquals("Expected latch ids to match!", "1447946713", planIdForFirstAttempt); + assertEquals("Expected latch ids to match for the second attempt!", planIdForFirstAttempt, planIdForSecondAttempt); + } + + /** + * A test case to verify plan results in different hash in case of different intermediate stream. + * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases. + */ + @Test + public void testGeneratePlanIdWithDifferentStreamSpecs() { + List streamSpecs = ImmutableList.of( + new StreamSpec("test-stream-1", "stream-1", "testStream"), + new StreamSpec("test-stream-2", "stream-2", "testStream"), + new StreamSpec("test-stream-3", "stream-3", "testStream")); + String planIdBeforeShuffle = getExecutionPlanId(streamSpecs); + + List updatedStreamSpecs = ImmutableList.of( + new StreamSpec("test-stream-1", "stream-1", "testStream"), + new StreamSpec("test-stream-4", "stream-4", "testStream"), + new StreamSpec("test-stream-3", "stream-3", "testStream")); + + assertNotEquals("Expected both of the latch ids to be different", planIdBeforeShuffle, + getExecutionPlanId(updatedStreamSpecs)); + } + + private String getExecutionPlanId(List updatedStreamSpecs) { + String intermediateStreamJson = updatedStreamSpecs.stream() + .map(this::streamSpecToJson) + .collect(Collectors.joining(",")); + + int planId = String.format(PLAN_JSON, intermediateStreamJson).hashCode(); + + return String.valueOf(planId); + } + + private String streamSpecToJson(StreamSpec streamSpec) { + return String.format(STREAM_SPEC_JSON_FORMAT, + streamSpec.getId(), + streamSpec.getId(), + streamSpec.getSystemName(), + streamSpec.getPhysicalName()); + } }