diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java index 9969b9d966..3af5db1eaf 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java @@ -22,8 +22,6 @@ import java.time.Duration; import java.util.List; import org.apache.samza.SamzaException; -import org.apache.samza.application.descriptors.ApplicationDescriptor; -import org.apache.samza.application.descriptors.ApplicationDescriptorImpl; import org.apache.samza.application.descriptors.ApplicationDescriptorUtil; import org.apache.samza.application.SamzaApplication; import org.apache.samza.config.Config; @@ -47,8 +45,8 @@ public class RemoteApplicationRunner implements ApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(RemoteApplicationRunner.class); private static final long DEFAULT_SLEEP_DURATION_MS = 2000; - private final ApplicationDescriptorImpl appDesc; - private final RemoteJobPlanner planner; + private final SamzaApplication app; + private final Config config; /** * Constructors a {@link RemoteApplicationRunner} to run the {@code app} with the {@code config}. @@ -57,13 +55,21 @@ public class RemoteApplicationRunner implements ApplicationRunner { * @param config configuration for the application */ public RemoteApplicationRunner(SamzaApplication app, Config config) { - this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config); - this.planner = new RemoteJobPlanner(appDesc); + this.app = app; + this.config = config; } @Override public void run(ExternalContext externalContext) { + if (new JobConfig(config).getConfigLoaderFactory().isPresent()) { + JobRunner runner = new JobRunner(JobPlanner.generateSingleJobConfig(config)); + runner.submit(); + return; + } + + // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed. try { + JobPlanner planner = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(app, config)); List jobConfigs = planner.prepareJobs(); if (jobConfigs.isEmpty()) { throw new SamzaException("No jobs to run."); @@ -85,7 +91,7 @@ public void kill() { // since currently we only support single actual remote job, we can get its status without // building the execution plan. try { - JobConfig jc = new JobConfig(JobPlanner.generateSingleJobConfig(appDesc.getConfig())); + JobConfig jc = new JobConfig(JobPlanner.generateSingleJobConfig(config)); LOG.info("Killing job {}", jc.getName()); JobRunner runner = new JobRunner(jc); runner.kill(); @@ -99,7 +105,7 @@ public ApplicationStatus status() { // since currently we only support single actual remote job, we can get its status without // building the execution plan try { - JobConfig jc = new JobConfig(JobPlanner.generateSingleJobConfig(appDesc.getConfig())); + JobConfig jc = new JobConfig(JobPlanner.generateSingleJobConfig(config)); return getApplicationStatus(jc); } catch (Throwable t) { throw new SamzaException("Failed to get status for application", t); @@ -113,7 +119,7 @@ public void waitForFinish() { @Override public boolean waitForFinish(Duration timeout) { - JobConfig jobConfig = new JobConfig(JobPlanner.generateSingleJobConfig(appDesc.getConfig())); + JobConfig jobConfig = new JobConfig(JobPlanner.generateSingleJobConfig(config)); boolean finished = true; long timeoutInMs = timeout.toMillis(); long startTimeInMs = System.currentTimeMillis(); diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java index 1464ece1c4..f38269482e 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java @@ -75,9 +75,20 @@ public void testWaitForFinishTimesout() { assertFalse("Application finished before the timeout.", finished); } + @Test + public void testRunWithConfigLoaderFactoryPresent() { + Map config = new HashMap<>(); + config.put(ApplicationConfig.APP_NAME, "test-app"); + config.put(JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory"); + config.put(JobConfig.STREAM_JOB_FACTORY_CLASS, MockStreamJobFactory.class.getName()); + runner = new RemoteApplicationRunner(null, new MapConfig(config)); + + runner.run(null); + } + @Test public void testGetStatus() { - Map m = new HashMap(); + Map m = new HashMap<>(); m.put(JobConfig.JOB_NAME, "jobName"); m.put(JobConfig.STREAM_JOB_FACTORY_CLASS, MockStreamJobFactory.class.getName()); @@ -101,8 +112,8 @@ public MockStreamJobFactory() { @Override public StreamJob getJob(final Config config) { - StreamJob streamJob = new StreamJob() { - JobConfig c = (JobConfig) config; + return new StreamJob() { + JobConfig c = new JobConfig(config); @Override public StreamJob submit() { @@ -137,8 +148,6 @@ public ApplicationStatus getStatus() { } } }; - - return streamJob; } } }