Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.samza.SamzaException;
Expand Down Expand Up @@ -57,13 +58,12 @@
public class LocalApplicationRunner extends AbstractApplicationRunner {

private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class);
// Latch id that's used for awaiting the init of application before creating the StreamProcessors
private static final String INIT_LATCH_ID = "init";
private static final String APPLICATION_RUNNER_ZK_PATH_SUFFIX = "/ApplicationRunnerData";
// Latch timeout is set to 10 min
private static final long LATCH_TIMEOUT_MINUTES = 10;
private static final long LEADER_ELECTION_WAIT_TIME_MS = 1000;

private final String uid;
private final CoordinationUtils coordinationUtils;
private final Set<StreamProcessor> processors = ConcurrentHashMap.newKeySet();
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private final AtomicInteger numProcessorsToStart = new AtomicInteger();
Expand Down Expand Up @@ -122,17 +122,13 @@ private void shutdownAndNotify() {
}
}

if (coordinationUtils != null) {
coordinationUtils.reset();
}
shutdownLatch.countDown();
}
}

public LocalApplicationRunner(Config config) {
super(config);
uid = UUID.randomUUID().toString();
coordinationUtils = createCoordinationUtils();
}

@Override
Expand All @@ -159,10 +155,14 @@ public void run(StreamApplication app) {
try {
// 1. initialize and plan
ExecutionPlan plan = getExecutionPlan(app);
writePlanJsonFile(plan.getPlanAsJson());

String executionPlanJson = plan.getPlanAsJson();
writePlanJsonFile(executionPlanJson);

// 2. create the necessary streams
createStreams(plan.getIntermediateStreams());
// TODO: System generated intermediate streams should have robust naming scheme. Refer JIRA-1391
String planId = String.valueOf(executionPlanJson.hashCode());
createStreams(planId, plan.getIntermediateStreams());

// 3. create the StreamProcessors
if (plan.getJobConfigs().isEmpty()) {
Expand Down Expand Up @@ -216,7 +216,8 @@ public void waitForFinish() {
// TODO: we will need a better way to package the configs with application runner
if (ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) {
ApplicationConfig appConfig = new ApplicationConfig(config);
return new ZkCoordinationServiceFactory().getCoordinationService(appConfig.getGlobalAppId(), uid, config);
return new ZkCoordinationServiceFactory().getCoordinationService(
appConfig.getGlobalAppId() + APPLICATION_RUNNER_ZK_PATH_SUFFIX, uid, config);
} else {
return null;
}
Expand All @@ -227,20 +228,32 @@ public void waitForFinish() {
* If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader
* will create the streams. All the runner processes will wait on the latch that is released after the leader finishes
* stream creation.
* @param planId a unique identifier representing the plan used for coordination purpose
* @param intStreams list of intermediate {@link StreamSpec}s
* @throws Exception exception for latch timeout
* @throws TimeoutException exception for latch timeout
*/
/* package private */ void createStreams(List<StreamSpec> intStreams) throws Exception {
/* package private */ void createStreams(String planId, List<StreamSpec> intStreams) throws TimeoutException {
if (!intStreams.isEmpty()) {
// Move the scope of coordination utils within stream creation to address long idle connection problem.
// Refer SAMZA-1385 for more details
CoordinationUtils coordinationUtils = createCoordinationUtils();
if (coordinationUtils != null) {
Latch initLatch = coordinationUtils.getLatch(1, INIT_LATCH_ID);
LeaderElector leaderElector = coordinationUtils.getLeaderElector();
leaderElector.setLeaderElectorListener(() -> {
getStreamManager().createStreams(intStreams);
initLatch.countDown();
});
leaderElector.tryBecomeLeader();
initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES);
Latch initLatch = coordinationUtils.getLatch(1, planId);

try {
// check if the processor needs to go through leader election and stream creation
if (shouldContestInElectionForStreamCreation(initLatch)) {
LeaderElector leaderElector = coordinationUtils.getLeaderElector();
leaderElector.setLeaderElectorListener(() -> {
getStreamManager().createStreams(intStreams);
initLatch.countDown();
});
leaderElector.tryBecomeLeader();
initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES);
}
} finally {
coordinationUtils.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for adding this here 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather have a close() method in each facility separate. This gives more control over the life cycle of the utils. But we don't have to change it now, since we will change it later.

}
} else {
// each application process will try creating the streams, which
// requires stream creation to be idempotent
Expand Down Expand Up @@ -272,4 +285,31 @@ StreamProcessor createStreamProcessor(
taskFactory.getClass().getCanonicalName()));
}
}

/**
* In order to fix SAMZA-1385, we are limiting the scope of coordination util within stream creation phase and destroying
* the coordination util right after. By closing the zk connection, we clean up the ephemeral node used for leader election.
* It creates the following issues whenever a new process joins after the ephemeral node is gone.
* 1. It is unnecessary to re-conduct leader election for stream creation in the same application lifecycle
* 2. Underlying systems may not support check for stream existence prior to creation which could have potential problems.
* As in interim solution, we reuse the same latch as a marker to determine if create streams phase is done for the
* application lifecycle using {@link Latch#await(long, TimeUnit)}
*
* @param streamCreationLatch latch used for stream creation
* @return true if processor needs to be part of election
* false otherwise
*/
private boolean shouldContestInElectionForStreamCreation(Latch streamCreationLatch) {
boolean eligibleForElection = true;

try {
streamCreationLatch.await(LEADER_ELECTION_WAIT_TIME_MS, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of the latch interface is super confusing because it blocks on await with 2 different timeouts - LEADER_ELECTION_WAIT_TIME and LATCH_TIMEOUT_MINUTES . Iiuc, we do this due to a lack of interface support to test the status of the latch. please correct me if I am wrong.

I find this acceptable only because this is a stop-gap solution until we fix the coordinatorUtils interface.

// case we didn't time out suggesting that latch already exists
eligibleForElection = false;
} catch (TimeoutException e) {
LOG.info("Timed out waiting for the latch! Going to enter leader election section to create streams");
}

return eligibleForElection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.I0Itec.zkclient.ZkClient;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
Expand All @@ -30,9 +31,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {

private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinatorFactory.class);
private static final String JOB_COORDINATOR_ZK_PATH_FORMAT = "%s/%s-%s-coordinationData";
private static final String DEFAULT_JOB_ID = "1";
private static final String DEFAULT_JOB_NAME = "defaultJob";

/**
* Method to instantiate an implementation of JobCoordinator
Expand All @@ -50,8 +55,21 @@ public JobCoordinator getJobCoordinator(Config config) {

private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry) {
ZkConfig zkConfig = new ZkConfig(config);
ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId());
ZkKeyBuilder keyBuilder = new ZkKeyBuilder(getJobCoordinationZkPath(config));
ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), metricsRegistry);
}

private String getJobCoordinationZkPath(Config config) {
JobConfig jobConfig = new JobConfig(config);
String appId = new ApplicationConfig(config).getGlobalAppId();
String jobName = jobConfig.getName().isDefined()
? jobConfig.getName().get()
: DEFAULT_JOB_NAME;
String jobId = jobConfig.getJobId().isDefined()
? jobConfig.getJobId().get()
: DEFAULT_JOB_ID;

return String.format(JOB_COORDINATOR_ZK_PATH_FORMAT, appId, jobName, jobId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.TimeUnit;

import java.util.concurrent.TimeoutException;
import org.apache.samza.coordinator.Latch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -53,8 +54,14 @@ public ZkProcessorLatch(int size, String latchId, String participantId, ZkUtils
}

@Override
public void await(long timeout, TimeUnit timeUnit) {
zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
// waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
// the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);

if (!targetPathExists) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

throw new TimeoutException("Timed out waiting for the targetPath");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@

package org.apache.samza.runtime;

import com.google.common.collect.ImmutableList;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.JobConfig;
Expand All @@ -38,31 +47,31 @@
import org.junit.Test;
import org.mockito.ArgumentCaptor;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;


public class TestLocalApplicationRunner {

private static final String PLAN_JSON = "{"
+ "\"jobs\":[{"
+ "\"jobName\":\"test-application\","
+ "\"jobId\":\"1\","
+ "\"operatorGraph\":{"
+ "\"intermediateStreams\":{%s},"
+ "\"applicationName\":\"test-application\",\"applicationId\":\"1\"}";
private static final String STREAM_SPEC_JSON_FORMAT = "\"%s\":{"
+ "\"streamSpec\":{"
+ "\"id\":\"%s\","
+ "\"systemName\":\"%s\","
+ "\"physicalName\":\"%s\","
+ "\"partitionCount\":2},"
+ "\"sourceJobs\":[\"test-app\"],"
+ "\"targetJobs\":[\"test-target-app\"]},";

@Test
public void testStreamCreation() throws Exception {
Map<String, String> config = new HashMap<>();
Expand Down Expand Up @@ -180,8 +189,10 @@ public boolean amILeader() {
@Override
public void await(long timeout, TimeUnit tu)
throws TimeoutException {
// in this test, latch is released before wait
assertTrue(done);
// in this test, latch is released after countDown is invoked
if (!done) {
throw new TimeoutException("timed out waiting for the target path");
}
}

@Override
Expand Down Expand Up @@ -343,4 +354,80 @@ public String getPlanAsJson()
assertEquals(spy.status(app), ApplicationStatus.UnsuccessfulFinish);
}

/**
* A test case to verify if the plan results in different hash if there is change in topological sort order.
* Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
*/
@Test
public void testPlanIdWithShuffledStreamSpecs() {
List<StreamSpec> streamSpecs = ImmutableList.of(
new StreamSpec("test-stream-1", "stream-1", "testStream"),
new StreamSpec("test-stream-2", "stream-2", "testStream"),
new StreamSpec("test-stream-3", "stream-3", "testStream"));
String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);

List<StreamSpec> shuffledStreamSpecs = ImmutableList.of(
new StreamSpec("test-stream-2", "stream-2", "testStream"),
new StreamSpec("test-stream-1", "stream-1", "testStream"),
new StreamSpec("test-stream-3", "stream-3", "testStream"));

assertNotEquals("Expected both of the latch ids to be different", planIdBeforeShuffle,
getExecutionPlanId(shuffledStreamSpecs));
}

/**
* A test case to verify if the plan results in same hash in case of same plan.
* Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
*/
@Test
public void testGeneratePlanIdWithSameStreamSpecs() {
List<StreamSpec> streamSpecs = ImmutableList.of(
new StreamSpec("test-stream-1", "stream-1", "testStream"),
new StreamSpec("test-stream-2", "stream-2", "testStream"),
new StreamSpec("test-stream-3", "stream-3", "testStream"));
String planIdForFirstAttempt = getExecutionPlanId(streamSpecs);
String planIdForSecondAttempt = getExecutionPlanId(streamSpecs);

assertEquals("Expected latch ids to match!", "1447946713", planIdForFirstAttempt);
assertEquals("Expected latch ids to match for the second attempt!", planIdForFirstAttempt, planIdForSecondAttempt);
}

/**
* A test case to verify plan results in different hash in case of different intermediate stream.
* Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
*/
@Test
public void testGeneratePlanIdWithDifferentStreamSpecs() {
List<StreamSpec> streamSpecs = ImmutableList.of(
new StreamSpec("test-stream-1", "stream-1", "testStream"),
new StreamSpec("test-stream-2", "stream-2", "testStream"),
new StreamSpec("test-stream-3", "stream-3", "testStream"));
String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);

List<StreamSpec> updatedStreamSpecs = ImmutableList.of(
new StreamSpec("test-stream-1", "stream-1", "testStream"),
new StreamSpec("test-stream-4", "stream-4", "testStream"),
new StreamSpec("test-stream-3", "stream-3", "testStream"));

assertNotEquals("Expected both of the latch ids to be different", planIdBeforeShuffle,
getExecutionPlanId(updatedStreamSpecs));
}

private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) {
String intermediateStreamJson = updatedStreamSpecs.stream()
.map(this::streamSpecToJson)
.collect(Collectors.joining(","));

int planId = String.format(PLAN_JSON, intermediateStreamJson).hashCode();

return String.valueOf(planId);
}

private String streamSpecToJson(StreamSpec streamSpec) {
return String.format(STREAM_SPEC_JSON_FORMAT,
streamSpec.getId(),
streamSpec.getId(),
streamSpec.getSystemName(),
streamSpec.getPhysicalName());
}
}