From 58ac719b262dac50d1fb869949819446f8a2d240 Mon Sep 17 00:00:00 2001 From: Cameron Lee Date: Wed, 9 Oct 2019 10:45:10 -0700 Subject: [PATCH] SAMZA-2343: Remove original split deployment code --- .../versioned/jobs/split-deployment.md | 59 ------------------- .../org/apache/samza/config/JobConfig.java | 19 ------ .../samza/job/local/ProcessJobFactory.scala | 6 -- .../TestContainerProcessManager.java | 25 -------- .../apache/samza/config/TestJobConfig.java | 17 ------ .../samza/job/TestShellCommandBuilder.scala | 6 +- samza-shell/src/main/bash/run-class.sh | 42 +++---------- .../job/yarn/YarnClusterResourceManager.java | 29 ++------- .../org/apache/samza/job/yarn/YarnJob.scala | 23 +------- 9 files changed, 17 insertions(+), 209 deletions(-) delete mode 100644 docs/learn/documentation/versioned/jobs/split-deployment.md diff --git a/docs/learn/documentation/versioned/jobs/split-deployment.md b/docs/learn/documentation/versioned/jobs/split-deployment.md deleted file mode 100644 index fa3e7aeeb5..0000000000 --- a/docs/learn/documentation/versioned/jobs/split-deployment.md +++ /dev/null @@ -1,59 +0,0 @@ ---- -layout: page -title: Separating Samza Framework and Jobs Deployment ---- - - - -### Motivation -Currently all Samza jobs are deployed as a single unit/package which combines all the Samza libraries, user code and configs together. Typically in a large organization the team that manages the Samza cluster is not the same as the teams that are running applications on top of Samza. In this case, the current way of deployment presents two major problems: - -* **Samza software releases**: -Every time Samza team releases a new version (for example a bug fix), the only way to deploy it is to rebuild all users packages and redeploy them. It would be much more efficient if the team could release the Samza framework separately, at its own cadence, and a simple job restart would pick up the new version. - -* **Packages incompatibilities**: -If both Samza and a job depend on the same software, but on different (especially backward incompatible) versions, they cannot be released together, because it will most likely cause some runtime issue. Ideally, each one of them would load the packages it needs separately. -NOTE.This problem is not addressed here. - -To address the first problem, we separate the deployment of the Samza framework from user jobs by defining two deployable units: - -* **Samza framework** - This contains Samza libraries only, and is deployed separately to all the machines in a cluster. -* **User's job** - This contains user code only, and uses the pre-deployed Samza framework to run. - -Split deployment allows upgrading the Samza framework without forcing developers to explicitly upgrade their running applications. It also allows different versions of Samza framework with simple config changes. This means we can support canary, upgrade and rollback scenarios commonly -required in organizations that run tens or hundreds of jobs. - -### Deployment sequence - -#### Pre-requisite for split deployment -Each deployment will now consist of two separate packages:

- -1. **Samza framework** - This includes all Samza libraries and scripts, such as samza-api, samza-core, samza-log4j, samza-kafka, samza-yarn, samza-kv, samza-kv-inmemory, samza-kv-rocksdb, samza-shell, samza-hdfs and all their dependencies. -2. **User's job** - This includes the job package: all user code for the StreamTask implementation, configs, and other libraries required by the job. The job's package should depend only samza-api and no other Samza libraries. The package won't be able to start by itself. In order to start, it will need to use the Samza framework. - -#### Deployment steps -To run a job in split deployment mode: - -1. **Deploy the framework**: -The Samza framework package should be deployed to ALL the machines of a cluster into a predefined, fixed location. This could be done by merely copying the jars, or creating a meta package that would deploy all of them. Let's assume that 'samza-framework' package is installed into the '/.../samza-fwk/0.12.0' directory. - -2. **Create symbolic link**: -A symbolic link needs to be created for the **stable** version of the framework to point to the framework location, e.g.: {% highlight bash %} ln -s /.../samza-fwk/0.12.0 /.../samza-fwk/STABLE' {% endhighlight %} - -3. **Deploy user job**: -In the job's config, the following property is required to enable split deployment, e.g. for Samza framework path at '/.../samza-fwk': {% highlight jproperties %} samza.fwk.path=/.../samza-fwk {% endhighlight %} By default Samza will look for the **stable** link inside the folder to find the framework. You can also override the version by configuring: {% highlight jproperties %} samza.fwk.version=0.11.1 {% endhighlight %} In this case Samza will pick '/.../samza-fwk/0.11.1' as the framework location. This way users can perform canary, upgrade and rollback their jobs easily by changing version in the config. diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java index efff5724b6..581aad4601 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java @@ -51,9 +51,6 @@ public class JobConfig extends MapConfig { public static final String JOB_ID = "job.id"; static final String DEFAULT_JOB_ID = "1"; - public static final String SAMZA_FWK_PATH = "samza.fwk.path"; - public static final String SAMZA_FWK_VERSION = "samza.fwk.version"; - public static final String JOB_COORDINATOR_SYSTEM = "job.coordinator.system"; public static final String JOB_DEFAULT_SYSTEM = "job.default.system"; @@ -314,22 +311,6 @@ public boolean getStandbyTasksEnabled() { return getStandbyTaskReplicationFactor() > 1; } - /** - * Reads the config to figure out if split deployment is enabled and fwk directory is setup - * @return fwk + "/" + version, or empty string if fwk path is not specified - */ - public static String getFwkPath(Config config) { - String fwkPath = config.get(SAMZA_FWK_PATH, ""); - String fwkVersion = config.get(SAMZA_FWK_VERSION); - if (fwkVersion == null || fwkVersion.isEmpty()) { - fwkVersion = "STABLE"; - } - if (!fwkPath.isEmpty()) { - fwkPath = fwkPath + File.separator + fwkVersion; - } - return fwkPath; - } - /** * The metadata file is written in a {@code exec-env-container-id}.metadata file in the log-dir of the container. * Here the {@code exec-env-container-id} refers to the ID assigned by the cluster manager (e.g., YARN) to the container, diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala index b1d3215b68..36f1457a90 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala @@ -81,11 +81,6 @@ class ProcessJobFactory extends StreamJobFactory with Logging { startpointManager.stop() } - val containerModel = coordinator.jobModel.getContainers.get(0) - - val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is configured - info("Process job. using fwkPath = " + fwkPath) - val taskConfig = new TaskConfig(config) val commandBuilderClass = taskConfig.getCommandClass(classOf[ShellCommandBuilder].getName) info("Using command builder class %s" format commandBuilderClass) @@ -98,7 +93,6 @@ class ProcessJobFactory extends StreamJobFactory with Logging { .setConfig(config) .setId("0") .setUrl(coordinator.server.getUrl) - .setCommandPath(fwkPath) new ProcessJob(commandBuilder, coordinator) } diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java index 827bddfebc..1ee68aa0b2 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java @@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit; import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.container.LocalityManager; import org.apache.samza.coordinator.JobModelManager; @@ -860,30 +859,6 @@ public void testNewContainerRequestedOnFailureWithKnownCode() throws Exception { cpm.stop(); } - @Test - public void testAppMasterWithFwk() { - Config conf = getConfig(); - SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); - MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); - MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); - - - ContainerProcessManager cpm = - buildContainerProcessManager(new ClusterManagerConfig(conf), state, clusterResourceManager, Optional.empty()); - cpm.start(); - SamzaResource container2 = new SamzaResource(1, 1024, "", "id0"); - assertFalse(cpm.shouldShutdown()); - cpm.onResourceAllocated(container2); - - configVals.put(JobConfig.SAMZA_FWK_PATH, "/export/content/whatever"); - Config config1 = new MapConfig(configVals); - - ContainerProcessManager cpm1 = - buildContainerProcessManager(new ClusterManagerConfig(config), state, clusterResourceManager, Optional.empty()); - cpm1.start(); - cpm1.onResourceAllocated(container2); - } - @After public void teardown() { server.stop(); diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java index dab0f77ef6..2e19773b54 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java @@ -547,23 +547,6 @@ public void testGetStandbyTaskReplicationFactor() { assertEquals(JobConfig.DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR, jobConfig.getStandbyTaskReplicationFactor()); } - @Test - public void testGetFwkPath() { - String samzaFwkPath = "/path/to/samza/fwk", samzaFwkVersion = "1.2.3"; - - // has path and version - Config config = new MapConfig( - ImmutableMap.of(JobConfig.SAMZA_FWK_PATH, samzaFwkPath, JobConfig.SAMZA_FWK_VERSION, samzaFwkVersion)); - assertEquals(samzaFwkPath + File.separator + samzaFwkVersion, JobConfig.getFwkPath(config)); - - // only has path; use STABLE for version - config = new MapConfig(ImmutableMap.of(JobConfig.SAMZA_FWK_PATH, samzaFwkPath)); - assertEquals(samzaFwkPath + File.separator + "STABLE", JobConfig.getFwkPath(config)); - - // no path; return empty string - assertTrue(JobConfig.getFwkPath(new MapConfig()).isEmpty()); - } - @Test public void testGetMetadataFile() { String execEnvContainerId = "container-id"; diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala index 4df53fd89b..c70af8dd5e 100644 --- a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala +++ b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala @@ -44,7 +44,7 @@ class TestShellCommandBuilder { // if cmdPath is specified, the full path to the command should be adjusted @Test - def testCommandWithFwkPath { + def testBuildCommandWithCommandPath { val urlStr = "http://www.linkedin.com" val config = new MapConfig(Map(ShellCommandConfig.COMMAND_SHELL_EXECUTE -> "foo").asJava) val scb = new ShellCommandBuilder @@ -54,8 +54,8 @@ class TestShellCommandBuilder { val command = scb.buildCommand assertEquals("foo", command) - scb.setCommandPath("/fwk/path") + scb.setCommandPath("/package/path") val command1 = scb.buildCommand - assertEquals("/fwk/path/foo", command1) + assertEquals("/package/path/foo", command1) } } \ No newline at end of file diff --git a/samza-shell/src/main/bash/run-class.sh b/samza-shell/src/main/bash/run-class.sh index cd50df3c57..5b3b7d1238 100755 --- a/samza-shell/src/main/bash/run-class.sh +++ b/samza-shell/src/main/bash/run-class.sh @@ -42,44 +42,16 @@ GC_LOG_ROTATION_OPTS="-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GC DEFAULT_LOG4J_FILE=$base_dir/lib/log4j.xml DEFAULT_LOG4J2_FILE=$base_dir/lib/log4j2.xml BASE_LIB_DIR="$base_dir/lib" -# JOB_LIB_DIR will be set for yarn container in ContainerUtil.java -# for others we set it to home_dir/lib -JOB_LIB_DIR="${JOB_LIB_DIR:-$base_dir/lib}" -export JOB_LIB_DIR=$JOB_LIB_DIR - -echo JOB_LIB_DIR=$JOB_LIB_DIR echo BASE_LIB_DIR=$BASE_LIB_DIR + CLASSPATH="" -if [ -d "$JOB_LIB_DIR" ] && [ "$JOB_LIB_DIR" != "$BASE_LIB_DIR" ]; then - # build a common classpath - # this class path will contain all the jars from the framework and the job's libs. - # in case of different version of the same lib - we pick the highest - - #all jars from the fwk - base_jars=`ls $BASE_LIB_DIR/*.[jw]ar` - #all jars from the job - job_jars=`for file in $JOB_LIB_DIR/*.[jw]ar; do name=\`basename $file\`; if [[ $base_jars != *"$name"* ]]; then echo "$file"; fi; done` - # get all lib jars and reverse sort it by versions - all_jars=`for file in $base_jars $job_jars; do echo \`basename $file|sed 's/.*[-]\([0-9]\+\..*\)[jw]ar$/\1/'\` $file; done|sort -t. -k 1,1nr -k 2,2nr -k 3,3nr -k 4,4nr|awk '{print $2}'` - # generate the class path based on the sorted result, all the jars need to be appended on newlines - # to ensure java argument length of 72 bytes is not violated - for jar in $all_jars; do CLASSPATH=$CLASSPATH" $jar \n"; done - - # for debug only - echo base_jars=$base_jars - echo job_jars=$job_jars - echo all_jars=$all_jars - echo generated combined CLASSPATH=$CLASSPATH -else - # default behavior, all the jars need to be appended on newlines - # to ensure line argument length of 72 bytes is not violated - for file in $BASE_LIB_DIR/*.[jw]ar; - do - CLASSPATH=$CLASSPATH" $file \n" - done - echo generated from BASE_LIB_DIR CLASSPATH=$CLASSPATH -fi +# all the jars need to be appended on newlines to ensure line argument length of 72 bytes is not violated +for file in $BASE_LIB_DIR/*.[jw]ar; +do + CLASSPATH=$CLASSPATH" $file \n" +done +echo generated from BASE_LIB_DIR CLASSPATH=$CLASSPATH # In some cases (AWS) $JAVA_HOME/bin doesn't contain jar. if [ -z "$JAVA_HOME" ] || [ ! -e "$JAVA_HOME/bin/jar" ]; then diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java index eade44dca6..14cb68f774 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java @@ -42,7 +42,6 @@ import org.apache.samza.clustermanager.ProcessorLaunchException; import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; import org.apache.samza.config.ShellCommandConfig; import org.apache.samza.config.YarnConfig; import org.apache.samza.coordinator.JobModelManager; @@ -585,15 +584,7 @@ public void onStopContainerError(ContainerId containerId, Throwable t) { */ public void runProcessor(String processorId, Container container, CommandBuilder cmdBuilder) throws IOException { String containerIdStr = ConverterUtils.toString(container.getId()); - // check if we have framework path specified. If yes - use it, if not use default ./__package/ - String jobLib = ""; // in case of separate framework, this directory will point at the job's libraries String cmdPath = "./__package/"; - - String fwkPath = JobConfig.getFwkPath(this.config); - if(fwkPath != null && (! fwkPath.isEmpty())) { - cmdPath = fwkPath; - jobLib = "export JOB_LIB_DIR=./__package/lib"; - } cmdBuilder.setCommandPath(cmdPath); String command = cmdBuilder.buildCommand(); @@ -601,8 +592,9 @@ public void runProcessor(String processorId, Container container, CommandBuilder env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString())); Path packagePath = new Path(yarnConfig.getPackagePath()); - String formattedCommand = getFormattedCommand(ApplicationConstants.LOG_DIR_EXPANSION_VAR, jobLib, command, - ApplicationConstants.STDOUT, ApplicationConstants.STDERR); + String formattedCommand = + getFormattedCommand(ApplicationConstants.LOG_DIR_EXPANSION_VAR, command, ApplicationConstants.STDOUT, + ApplicationConstants.STDERR); log.info("Running Processor ID: {} on Container ID: {} on host: {} using command: {} and env: {} and package path: {}", processorId, containerIdStr, container.getNodeHttpAddress(), formattedCommand, env, packagePath); @@ -696,18 +688,9 @@ private Map getEscapedEnvironmentVariablesMap(CommandBuilder cmd } - private String getFormattedCommand(String logDirExpansionVar, - String jobLib, - String command, - String stdOut, - String stdErr) { - if (!jobLib.isEmpty()) { - jobLib = "&& " + jobLib; // add job's libraries exported to an env variable - } - - return String - .format("export SAMZA_LOG_DIR=%s %s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s", logDirExpansionVar, - jobLib, logDirExpansionVar, command, stdOut, stdErr); + private String getFormattedCommand(String logDirExpansionVar, String command, String stdOut, String stdErr) { + return String.format("export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s", + logDirExpansionVar, logDirExpansionVar, command, stdOut, stdErr); } /** diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala index b363590c22..ee6a4989ee 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala @@ -48,9 +48,6 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob { appId = client.submitApplication( config, List( - // we need something like this: - //"export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec /bin/run-am.sh 1>logs/%s 2>logs/%s" - "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s" format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, cmdExec, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)), @@ -88,25 +85,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob { } def buildAmCmd() = { - // figure out if we have framework is deployed into a separate location - val fwkPath = config.get(JobConfig.SAMZA_FWK_PATH, "") - var fwkVersion = config.get(JobConfig.SAMZA_FWK_VERSION) - if (fwkVersion == null || fwkVersion.isEmpty()) { - fwkVersion = "STABLE" - } - logger.info("Inside YarnJob: fwk_path is %s, ver is %s use it directly " format(fwkPath, fwkVersion)) - - var cmdExec = "./__package/bin/run-jc.sh" // default location - - if (!fwkPath.isEmpty()) { - // if we have framework installed as a separate package - use it - cmdExec = fwkPath + "/" + fwkVersion + "/bin/run-jc.sh" - - logger.info("Using FWK path: " + "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s". - format(ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, cmdExec, - ApplicationConstants.STDOUT, ApplicationConstants.STDERR)) - - } + val cmdExec = "./__package/bin/run-jc.sh" // default location cmdExec }