From 5889452fe2809fac1b7b0e27dcb0e7d140b2dc02 Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Thu, 9 Jan 2020 17:47:21 -0800 Subject: [PATCH 1/3] SAMZA-2409: Update YarnJob to construct job submission env varible Design: https://cwiki.apache.org/confluence/display/SAMZA/SEP-23%3A+Simplify+Job+Runner Changes: 1. Depending the on the existence of job.config.loader.factory, YarnJob will alternatively wraps job submission configs to Yarn. API Changes: N/A. This is part of a series PRs, detailed information will be provided in the last/main PR. Upgrade Instructions: N/A. This is part of a series PRs, detailed information will be provided in the last/main PR. Usage Instructions: N/A. This is part of a series PRs, detailed information will be provided in the last/main PR. Tests: Unit Tests --- .../org/apache/samza/job/yarn/YarnJob.scala | 11 +++++++--- .../apache/samza/job/yarn/TestYarnJob.java | 20 +++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala index ae20a29a0c..f05f508a81 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala @@ -173,10 +173,15 @@ object YarnJob extends Logging { @VisibleForTesting private[yarn] def buildEnvironment(config: Config, yarnConfig: YarnConfig, jobConfig: JobConfig): Map[String, String] = { - val coordinatorSystemConfig = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config) val envMapBuilder = Map.newBuilder[String, String] - envMapBuilder += ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG -> - Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig)) + if (jobConfig.getConfigLoaderFactory.isPresent) { + envMapBuilder += ShellCommandConfig.ENV_SUBMISSION_CONFIG -> + Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(config)) + } else { + val coordinatorSystemConfig = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config) + envMapBuilder += ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG -> + Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig)) + } envMapBuilder += ShellCommandConfig.ENV_JAVA_OPTS -> Util.envVarEscape(yarnConfig.getAmOpts) val clusterBasedJobCoordinatorDependencyIsolationEnabled = jobConfig.getClusterBasedJobCoordinatorDependencyIsolationEnabled diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java index c10c890a9c..daa719bcf4 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java @@ -111,4 +111,24 @@ public void testBuildEnvironmentWithAMJavaHome() throws IOException { assertEquals(expected, JavaConverters.mapAsJavaMapConverter( YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava()); } + + @Test + public void testBuildJobSubmissionEnvironment() throws IOException { + Config config = new MapConfig(new ImmutableMap.Builder() + .put(JobConfig.JOB_NAME, "jobName") + .put(JobConfig.JOB_ID, "jobId") + .put(JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory") + .put(YarnConfig.AM_JVM_OPTIONS, "") + .put(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED, "true") + .build()); + String expectedSubmissionConfig = Util.envVarEscape(SamzaObjectMapper.getObjectMapper() + .writeValueAsString(config)); + Map expected = ImmutableMap.of( + ShellCommandConfig.ENV_SUBMISSION_CONFIG(), expectedSubmissionConfig, + ShellCommandConfig.ENV_JAVA_OPTS(), "", + ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED(), "true", + ShellCommandConfig.ENV_APPLICATION_LIB_DIR(), "./__package/lib"); + assertEquals(expected, JavaConverters.mapAsJavaMapConverter( + YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava()); + } } \ No newline at end of file From 8229a053986d9de12c026e519f390f13fb79d037 Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Thu, 9 Jan 2020 17:47:21 -0800 Subject: [PATCH 2/3] SAMZA-2409: Update YarnJob to construct job submission env varible Design: https://cwiki.apache.org/confluence/display/SAMZA/SEP-23%3A+Simplify+Job+Runner Changes: 1. Depending the on the existence of job.config.loader.factory, YarnJob will alternatively wraps job submission configs to Yarn. API Changes: N/A. This is part of a series PRs, detailed information will be provided in the last/main PR. Upgrade Instructions: N/A. This is part of a series PRs, detailed information will be provided in the last/main PR. Usage Instructions: N/A. This is part of a series PRs, detailed information will be provided in the last/main PR. Tests: Unit Tests --- .../org/apache/samza/job/yarn/YarnJob.scala | 12 ++++++++--- .../apache/samza/job/yarn/TestYarnJob.java | 20 +++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala index ae20a29a0c..17e882a07f 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala @@ -173,10 +173,16 @@ object YarnJob extends Logging { @VisibleForTesting private[yarn] def buildEnvironment(config: Config, yarnConfig: YarnConfig, jobConfig: JobConfig): Map[String, String] = { - val coordinatorSystemConfig = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config) val envMapBuilder = Map.newBuilder[String, String] - envMapBuilder += ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG -> - Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig)) + if (jobConfig.getConfigLoaderFactory.isPresent) { + envMapBuilder += ShellCommandConfig.ENV_SUBMISSION_CONFIG -> + Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(config)) + } else { + // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed. + val coordinatorSystemConfig = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config) + envMapBuilder += ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG -> + Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig)) + } envMapBuilder += ShellCommandConfig.ENV_JAVA_OPTS -> Util.envVarEscape(yarnConfig.getAmOpts) val clusterBasedJobCoordinatorDependencyIsolationEnabled = jobConfig.getClusterBasedJobCoordinatorDependencyIsolationEnabled diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java index c10c890a9c..daa719bcf4 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java @@ -111,4 +111,24 @@ public void testBuildEnvironmentWithAMJavaHome() throws IOException { assertEquals(expected, JavaConverters.mapAsJavaMapConverter( YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava()); } + + @Test + public void testBuildJobSubmissionEnvironment() throws IOException { + Config config = new MapConfig(new ImmutableMap.Builder() + .put(JobConfig.JOB_NAME, "jobName") + .put(JobConfig.JOB_ID, "jobId") + .put(JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory") + .put(YarnConfig.AM_JVM_OPTIONS, "") + .put(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED, "true") + .build()); + String expectedSubmissionConfig = Util.envVarEscape(SamzaObjectMapper.getObjectMapper() + .writeValueAsString(config)); + Map expected = ImmutableMap.of( + ShellCommandConfig.ENV_SUBMISSION_CONFIG(), expectedSubmissionConfig, + ShellCommandConfig.ENV_JAVA_OPTS(), "", + ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED(), "true", + ShellCommandConfig.ENV_APPLICATION_LIB_DIR(), "./__package/lib"); + assertEquals(expected, JavaConverters.mapAsJavaMapConverter( + YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava()); + } } \ No newline at end of file From 99a667638703f68889b097ad8ad637f14137ceeb Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Tue, 14 Jan 2020 12:32:21 -0800 Subject: [PATCH 3/3] Add clean up ticket --- .../src/main/scala/org/apache/samza/job/yarn/YarnJob.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala index 17e882a07f..43e6a7c659 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala @@ -178,7 +178,7 @@ object YarnJob extends Logging { envMapBuilder += ShellCommandConfig.ENV_SUBMISSION_CONFIG -> Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(config)) } else { - // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed. + // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed. val coordinatorSystemConfig = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config) envMapBuilder += ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG -> Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig))