diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala index ae20a29a0c..43e6a7c659 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala @@ -173,10 +173,16 @@ object YarnJob extends Logging { @VisibleForTesting private[yarn] def buildEnvironment(config: Config, yarnConfig: YarnConfig, jobConfig: JobConfig): Map[String, String] = { - val coordinatorSystemConfig = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config) val envMapBuilder = Map.newBuilder[String, String] - envMapBuilder += ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG -> - Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig)) + if (jobConfig.getConfigLoaderFactory.isPresent) { + envMapBuilder += ShellCommandConfig.ENV_SUBMISSION_CONFIG -> + Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(config)) + } else { + // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed. + val coordinatorSystemConfig = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config) + envMapBuilder += ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG -> + Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig)) + } envMapBuilder += ShellCommandConfig.ENV_JAVA_OPTS -> Util.envVarEscape(yarnConfig.getAmOpts) val clusterBasedJobCoordinatorDependencyIsolationEnabled = jobConfig.getClusterBasedJobCoordinatorDependencyIsolationEnabled diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java index c10c890a9c..daa719bcf4 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java @@ -111,4 +111,24 @@ public void testBuildEnvironmentWithAMJavaHome() throws IOException { assertEquals(expected, JavaConverters.mapAsJavaMapConverter( YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava()); } + + @Test + public void testBuildJobSubmissionEnvironment() throws IOException { + Config config = new MapConfig(new ImmutableMap.Builder() + .put(JobConfig.JOB_NAME, "jobName") + .put(JobConfig.JOB_ID, "jobId") + .put(JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory") + .put(YarnConfig.AM_JVM_OPTIONS, "") + .put(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED, "true") + .build()); + String expectedSubmissionConfig = Util.envVarEscape(SamzaObjectMapper.getObjectMapper() + .writeValueAsString(config)); + Map expected = ImmutableMap.of( + ShellCommandConfig.ENV_SUBMISSION_CONFIG(), expectedSubmissionConfig, + ShellCommandConfig.ENV_JAVA_OPTS(), "", + ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED(), "true", + ShellCommandConfig.ENV_APPLICATION_LIB_DIR(), "./__package/lib"); + assertEquals(expected, JavaConverters.mapAsJavaMapConverter( + YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava()); + } } \ No newline at end of file