Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 0 additions & 59 deletions docs/learn/documentation/versioned/jobs/split-deployment.md

This file was deleted.

19 changes: 0 additions & 19 deletions samza-core/src/main/java/org/apache/samza/config/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ public class JobConfig extends MapConfig {
public static final String JOB_ID = "job.id";
static final String DEFAULT_JOB_ID = "1";

public static final String SAMZA_FWK_PATH = "samza.fwk.path";
public static final String SAMZA_FWK_VERSION = "samza.fwk.version";

public static final String JOB_COORDINATOR_SYSTEM = "job.coordinator.system";
public static final String JOB_DEFAULT_SYSTEM = "job.default.system";

Expand Down Expand Up @@ -314,22 +311,6 @@ public boolean getStandbyTasksEnabled() {
return getStandbyTaskReplicationFactor() > 1;
}

/**
* Reads the config to figure out if split deployment is enabled and fwk directory is setup
* @return fwk + "/" + version, or empty string if fwk path is not specified
*/
public static String getFwkPath(Config config) {
String fwkPath = config.get(SAMZA_FWK_PATH, "");
String fwkVersion = config.get(SAMZA_FWK_VERSION);
if (fwkVersion == null || fwkVersion.isEmpty()) {
fwkVersion = "STABLE";
}
if (!fwkPath.isEmpty()) {
fwkPath = fwkPath + File.separator + fwkVersion;
}
return fwkPath;
}

/**
* The metadata file is written in a {@code exec-env-container-id}.metadata file in the log-dir of the container.
* Here the {@code exec-env-container-id} refers to the ID assigned by the cluster manager (e.g., YARN) to the container,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
startpointManager.stop()
}

val containerModel = coordinator.jobModel.getContainers.get(0)

val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is configured
info("Process job. using fwkPath = " + fwkPath)

val taskConfig = new TaskConfig(config)
val commandBuilderClass = taskConfig.getCommandClass(classOf[ShellCommandBuilder].getName)
info("Using command builder class %s" format commandBuilderClass)
Expand All @@ -98,7 +93,6 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
.setConfig(config)
.setId("0")
.setUrl(coordinator.server.getUrl)
.setCommandPath(fwkPath)

new ProcessJob(commandBuilder, coordinator)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.TimeUnit;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.coordinator.JobModelManager;
Expand Down Expand Up @@ -860,30 +859,6 @@ public void testNewContainerRequestedOnFailureWithKnownCode() throws Exception {
cpm.stop();
}

@Test
public void testAppMasterWithFwk() {
Config conf = getConfig();
SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);


ContainerProcessManager cpm =
buildContainerProcessManager(new ClusterManagerConfig(conf), state, clusterResourceManager, Optional.empty());
cpm.start();
SamzaResource container2 = new SamzaResource(1, 1024, "", "id0");
assertFalse(cpm.shouldShutdown());
cpm.onResourceAllocated(container2);

configVals.put(JobConfig.SAMZA_FWK_PATH, "/export/content/whatever");
Config config1 = new MapConfig(configVals);

ContainerProcessManager cpm1 =
buildContainerProcessManager(new ClusterManagerConfig(config), state, clusterResourceManager, Optional.empty());
cpm1.start();
cpm1.onResourceAllocated(container2);
}

@After
public void teardown() {
server.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,23 +547,6 @@ public void testGetStandbyTaskReplicationFactor() {
assertEquals(JobConfig.DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR, jobConfig.getStandbyTaskReplicationFactor());
}

@Test
public void testGetFwkPath() {
String samzaFwkPath = "/path/to/samza/fwk", samzaFwkVersion = "1.2.3";

// has path and version
Config config = new MapConfig(
ImmutableMap.of(JobConfig.SAMZA_FWK_PATH, samzaFwkPath, JobConfig.SAMZA_FWK_VERSION, samzaFwkVersion));
assertEquals(samzaFwkPath + File.separator + samzaFwkVersion, JobConfig.getFwkPath(config));

// only has path; use STABLE for version
config = new MapConfig(ImmutableMap.of(JobConfig.SAMZA_FWK_PATH, samzaFwkPath));
assertEquals(samzaFwkPath + File.separator + "STABLE", JobConfig.getFwkPath(config));

// no path; return empty string
assertTrue(JobConfig.getFwkPath(new MapConfig()).isEmpty());
}

@Test
public void testGetMetadataFile() {
String execEnvContainerId = "container-id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class TestShellCommandBuilder {

// if cmdPath is specified, the full path to the command should be adjusted
@Test
def testCommandWithFwkPath {
def testBuildCommandWithCommandPath {
val urlStr = "http://www.linkedin.com"
val config = new MapConfig(Map(ShellCommandConfig.COMMAND_SHELL_EXECUTE -> "foo").asJava)
val scb = new ShellCommandBuilder
Expand All @@ -54,8 +54,8 @@ class TestShellCommandBuilder {
val command = scb.buildCommand
assertEquals("foo", command)

scb.setCommandPath("/fwk/path")
scb.setCommandPath("/package/path")
val command1 = scb.buildCommand
assertEquals("/fwk/path/foo", command1)
assertEquals("/package/path/foo", command1)
}
}
42 changes: 7 additions & 35 deletions samza-shell/src/main/bash/run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,44 +42,16 @@ GC_LOG_ROTATION_OPTS="-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GC
DEFAULT_LOG4J_FILE=$base_dir/lib/log4j.xml
DEFAULT_LOG4J2_FILE=$base_dir/lib/log4j2.xml
BASE_LIB_DIR="$base_dir/lib"
# JOB_LIB_DIR will be set for yarn container in ContainerUtil.java
# for others we set it to home_dir/lib
JOB_LIB_DIR="${JOB_LIB_DIR:-$base_dir/lib}"

export JOB_LIB_DIR=$JOB_LIB_DIR

echo JOB_LIB_DIR=$JOB_LIB_DIR
echo BASE_LIB_DIR=$BASE_LIB_DIR

CLASSPATH=""
if [ -d "$JOB_LIB_DIR" ] && [ "$JOB_LIB_DIR" != "$BASE_LIB_DIR" ]; then
# build a common classpath
# this class path will contain all the jars from the framework and the job's libs.
# in case of different version of the same lib - we pick the highest

#all jars from the fwk
base_jars=`ls $BASE_LIB_DIR/*.[jw]ar`
#all jars from the job
job_jars=`for file in $JOB_LIB_DIR/*.[jw]ar; do name=\`basename $file\`; if [[ $base_jars != *"$name"* ]]; then echo "$file"; fi; done`
# get all lib jars and reverse sort it by versions
all_jars=`for file in $base_jars $job_jars; do echo \`basename $file|sed 's/.*[-]\([0-9]\+\..*\)[jw]ar$/\1/'\` $file; done|sort -t. -k 1,1nr -k 2,2nr -k 3,3nr -k 4,4nr|awk '{print $2}'`
# generate the class path based on the sorted result, all the jars need to be appended on newlines
# to ensure java argument length of 72 bytes is not violated
for jar in $all_jars; do CLASSPATH=$CLASSPATH" $jar \n"; done

# for debug only
echo base_jars=$base_jars
echo job_jars=$job_jars
echo all_jars=$all_jars
echo generated combined CLASSPATH=$CLASSPATH
else
# default behavior, all the jars need to be appended on newlines
# to ensure line argument length of 72 bytes is not violated
for file in $BASE_LIB_DIR/*.[jw]ar;
do
CLASSPATH=$CLASSPATH" $file \n"
done
echo generated from BASE_LIB_DIR CLASSPATH=$CLASSPATH
fi
# all the jars need to be appended on newlines to ensure line argument length of 72 bytes is not violated
for file in $BASE_LIB_DIR/*.[jw]ar;
do
CLASSPATH=$CLASSPATH" $file \n"
done
echo generated from BASE_LIB_DIR CLASSPATH=$CLASSPATH

# In some cases (AWS) $JAVA_HOME/bin doesn't contain jar.
if [ -z "$JAVA_HOME" ] || [ ! -e "$JAVA_HOME/bin/jar" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.samza.clustermanager.ProcessorLaunchException;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.YarnConfig;
import org.apache.samza.coordinator.JobModelManager;
Expand Down Expand Up @@ -585,24 +584,17 @@ public void onStopContainerError(ContainerId containerId, Throwable t) {
*/
public void runProcessor(String processorId, Container container, CommandBuilder cmdBuilder) throws IOException {
String containerIdStr = ConverterUtils.toString(container.getId());
// check if we have framework path specified. If yes - use it, if not use default ./__package/
String jobLib = ""; // in case of separate framework, this directory will point at the job's libraries
String cmdPath = "./__package/";

String fwkPath = JobConfig.getFwkPath(this.config);
if(fwkPath != null && (! fwkPath.isEmpty())) {
cmdPath = fwkPath;
jobLib = "export JOB_LIB_DIR=./__package/lib";
}
cmdBuilder.setCommandPath(cmdPath);
String command = cmdBuilder.buildCommand();

Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString()));

Path packagePath = new Path(yarnConfig.getPackagePath());
String formattedCommand = getFormattedCommand(ApplicationConstants.LOG_DIR_EXPANSION_VAR, jobLib, command,
ApplicationConstants.STDOUT, ApplicationConstants.STDERR);
String formattedCommand =
getFormattedCommand(ApplicationConstants.LOG_DIR_EXPANSION_VAR, command, ApplicationConstants.STDOUT,
ApplicationConstants.STDERR);

log.info("Running Processor ID: {} on Container ID: {} on host: {} using command: {} and env: {} and package path: {}",
processorId, containerIdStr, container.getNodeHttpAddress(), formattedCommand, env, packagePath);
Expand Down Expand Up @@ -696,18 +688,9 @@ private Map<String, String> getEscapedEnvironmentVariablesMap(CommandBuilder cmd
}


private String getFormattedCommand(String logDirExpansionVar,
String jobLib,
String command,
String stdOut,
String stdErr) {
if (!jobLib.isEmpty()) {
jobLib = "&& " + jobLib; // add job's libraries exported to an env variable
}

return String
.format("export SAMZA_LOG_DIR=%s %s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s", logDirExpansionVar,
jobLib, logDirExpansionVar, command, stdOut, stdErr);
private String getFormattedCommand(String logDirExpansionVar, String command, String stdOut, String stdErr) {
return String.format("export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s",
logDirExpansionVar, logDirExpansionVar, command, stdOut, stdErr);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
appId = client.submitApplication(
config,
List(
// we need something like this:
//"export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec <fwk_path>/bin/run-am.sh 1>logs/%s 2>logs/%s"

"export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s"
format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR,
cmdExec, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)),
Expand Down Expand Up @@ -88,25 +85,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
}

def buildAmCmd() = {
// figure out if we have framework is deployed into a separate location
val fwkPath = config.get(JobConfig.SAMZA_FWK_PATH, "")
var fwkVersion = config.get(JobConfig.SAMZA_FWK_VERSION)
if (fwkVersion == null || fwkVersion.isEmpty()) {
fwkVersion = "STABLE"
}
logger.info("Inside YarnJob: fwk_path is %s, ver is %s use it directly " format(fwkPath, fwkVersion))

var cmdExec = "./__package/bin/run-jc.sh" // default location

if (!fwkPath.isEmpty()) {
// if we have framework installed as a separate package - use it
cmdExec = fwkPath + "/" + fwkVersion + "/bin/run-jc.sh"

logger.info("Using FWK path: " + "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s".
format(ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, cmdExec,
ApplicationConstants.STDOUT, ApplicationConstants.STDERR))

}
val cmdExec = "./__package/bin/run-jc.sh" // default location
cmdExec
}

Expand Down