From 16c0d74b9433207b2b59cf53064bd46dea7816bd Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Fri, 10 Jan 2020 10:31:18 -0800 Subject: [PATCH 1/4] SAMZA-2408: Update RemoteApplicationRunner to submit job only Design: https://cwiki.apache.org/confluence/display/SAMZA/SEP-23%3A+Simplify+Job+Runner Changes: 1. Depending on the existence of job.config.loader.factory, RemoteApplicationRunner will alternatively only submit the job, without planning. 2. Late initialize RemoteJobPlanner only when needed to avoid executing user code in submitting only mode. API Changes: N/A. This is part of a series PRs, detailed information will be provided in the last/main PR. Upgrade Instructions: N/A. This is part of a series PRs, detailed information will be provided in the last/main PR. Usage Instructions: N/A. This is part of a series PRs, detailed information will be provided in the last/main PR. Tests: N/A --- .../runtime/RemoteApplicationRunner.java | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java index 9969b9d966..26752c8a30 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java @@ -22,8 +22,6 @@ import java.time.Duration; import java.util.List; import org.apache.samza.SamzaException; -import org.apache.samza.application.descriptors.ApplicationDescriptor; -import org.apache.samza.application.descriptors.ApplicationDescriptorImpl; import org.apache.samza.application.descriptors.ApplicationDescriptorUtil; import org.apache.samza.application.SamzaApplication; import org.apache.samza.config.Config; @@ -47,8 +45,8 @@ public class RemoteApplicationRunner implements ApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(RemoteApplicationRunner.class); private static final long DEFAULT_SLEEP_DURATION_MS = 2000; - private final ApplicationDescriptorImpl appDesc; - private final RemoteJobPlanner planner; + private final SamzaApplication app; + private final Config config; /** * Constructors a {@link RemoteApplicationRunner} to run the {@code app} with the {@code config}. @@ -57,13 +55,21 @@ public class RemoteApplicationRunner implements ApplicationRunner { * @param config configuration for the application */ public RemoteApplicationRunner(SamzaApplication app, Config config) { - this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config); - this.planner = new RemoteJobPlanner(appDesc); + this.app = app; + this.config = config; } @Override public void run(ExternalContext externalContext) { + if (new JobConfig(this.config).getConfigLoaderFactory().isPresent()) { + JobRunner runner = new JobRunner(config); + runner.submit(); + return; + } + + // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed. try { + JobPlanner planner = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(app, config)); List jobConfigs = planner.prepareJobs(); if (jobConfigs.isEmpty()) { throw new SamzaException("No jobs to run."); @@ -85,7 +91,7 @@ public void kill() { // since currently we only support single actual remote job, we can get its status without // building the execution plan. try { - JobConfig jc = new JobConfig(JobPlanner.generateSingleJobConfig(appDesc.getConfig())); + JobConfig jc = new JobConfig(JobPlanner.generateSingleJobConfig(config)); LOG.info("Killing job {}", jc.getName()); JobRunner runner = new JobRunner(jc); runner.kill(); @@ -99,7 +105,7 @@ public ApplicationStatus status() { // since currently we only support single actual remote job, we can get its status without // building the execution plan try { - JobConfig jc = new JobConfig(JobPlanner.generateSingleJobConfig(appDesc.getConfig())); + JobConfig jc = new JobConfig(JobPlanner.generateSingleJobConfig(config)); return getApplicationStatus(jc); } catch (Throwable t) { throw new SamzaException("Failed to get status for application", t); @@ -113,7 +119,7 @@ public void waitForFinish() { @Override public boolean waitForFinish(Duration timeout) { - JobConfig jobConfig = new JobConfig(JobPlanner.generateSingleJobConfig(appDesc.getConfig())); + JobConfig jobConfig = new JobConfig(JobPlanner.generateSingleJobConfig(config)); boolean finished = true; long timeoutInMs = timeout.toMillis(); long startTimeInMs = System.currentTimeMillis(); From 4105111a81a0921ccccf61598eba3589f7b23d60 Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Fri, 10 Jan 2020 10:54:18 -0800 Subject: [PATCH 2/4] Use config instead of this.config --- .../java/org/apache/samza/runtime/RemoteApplicationRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java index 26752c8a30..475b7a3afc 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java @@ -61,7 +61,7 @@ public RemoteApplicationRunner(SamzaApplication app, Config config) { @Override public void run(ExternalContext externalContext) { - if (new JobConfig(this.config).getConfigLoaderFactory().isPresent()) { + if (new JobConfig(config).getConfigLoaderFactory().isPresent()) { JobRunner runner = new JobRunner(config); runner.submit(); return; From 4f864b2f87dc3a364e5344340b6768de942fc9ea Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Tue, 14 Jan 2020 12:42:06 -0800 Subject: [PATCH 3/4] Add unit tests and clean up ticket --- .../runtime/RemoteApplicationRunner.java | 2 +- .../runtime/TestRemoteApplicationRunner.java | 19 ++++++++++++++----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java index 475b7a3afc..c6cd5fa17e 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java @@ -67,7 +67,7 @@ public void run(ExternalContext externalContext) { return; } - // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed. + // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed. try { JobPlanner planner = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(app, config)); List jobConfigs = planner.prepareJobs(); diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java index 1464ece1c4..f38269482e 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java @@ -75,9 +75,20 @@ public void testWaitForFinishTimesout() { assertFalse("Application finished before the timeout.", finished); } + @Test + public void testRunWithConfigLoaderFactoryPresent() { + Map config = new HashMap<>(); + config.put(ApplicationConfig.APP_NAME, "test-app"); + config.put(JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory"); + config.put(JobConfig.STREAM_JOB_FACTORY_CLASS, MockStreamJobFactory.class.getName()); + runner = new RemoteApplicationRunner(null, new MapConfig(config)); + + runner.run(null); + } + @Test public void testGetStatus() { - Map m = new HashMap(); + Map m = new HashMap<>(); m.put(JobConfig.JOB_NAME, "jobName"); m.put(JobConfig.STREAM_JOB_FACTORY_CLASS, MockStreamJobFactory.class.getName()); @@ -101,8 +112,8 @@ public MockStreamJobFactory() { @Override public StreamJob getJob(final Config config) { - StreamJob streamJob = new StreamJob() { - JobConfig c = (JobConfig) config; + return new StreamJob() { + JobConfig c = new JobConfig(config); @Override public StreamJob submit() { @@ -137,8 +148,6 @@ public ApplicationStatus getStatus() { } } }; - - return streamJob; } } } From ba8ab367649bf94e977f55c4a331e6e7ff07cf8c Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Wed, 15 Jan 2020 13:47:43 -0800 Subject: [PATCH 4/4] Use JobPlanner.generateSingleJobConfig to be consistent with kill, status and waitForFinish. This is also used in ExecutionPlanner as well before. --- .../java/org/apache/samza/runtime/RemoteApplicationRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java index c6cd5fa17e..3af5db1eaf 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java @@ -62,7 +62,7 @@ public RemoteApplicationRunner(SamzaApplication app, Config config) { @Override public void run(ExternalContext externalContext) { if (new JobConfig(config).getConfigLoaderFactory().isPresent()) { - JobRunner runner = new JobRunner(config); + JobRunner runner = new JobRunner(JobPlanner.generateSingleJobConfig(config)); runner.submit(); return; }