Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import java.time.Duration;
import java.util.List;
import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
import org.apache.samza.application.SamzaApplication;
import org.apache.samza.config.Config;
Expand All @@ -47,8 +45,8 @@ public class RemoteApplicationRunner implements ApplicationRunner {
private static final Logger LOG = LoggerFactory.getLogger(RemoteApplicationRunner.class);
private static final long DEFAULT_SLEEP_DURATION_MS = 2000;

private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
private final RemoteJobPlanner planner;
private final SamzaApplication app;
private final Config config;

/**
* Constructors a {@link RemoteApplicationRunner} to run the {@code app} with the {@code config}.
Expand All @@ -57,13 +55,21 @@ public class RemoteApplicationRunner implements ApplicationRunner {
* @param config configuration for the application
*/
public RemoteApplicationRunner(SamzaApplication app, Config config) {
this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
this.planner = new RemoteJobPlanner(appDesc);
this.app = app;
this.config = config;
}

@Override
public void run(ExternalContext externalContext) {
if (new JobConfig(config).getConfigLoaderFactory().isPresent()) {
JobRunner runner = new JobRunner(JobPlanner.generateSingleJobConfig(config));
runner.submit();
return;
}

// TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed.
try {
JobPlanner planner = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(app, config));
List<JobConfig> jobConfigs = planner.prepareJobs();
if (jobConfigs.isEmpty()) {
throw new SamzaException("No jobs to run.");
Expand All @@ -85,7 +91,7 @@ public void kill() {
// since currently we only support single actual remote job, we can get its status without
// building the execution plan.
try {
JobConfig jc = new JobConfig(JobPlanner.generateSingleJobConfig(appDesc.getConfig()));
JobConfig jc = new JobConfig(JobPlanner.generateSingleJobConfig(config));
LOG.info("Killing job {}", jc.getName());
JobRunner runner = new JobRunner(jc);
runner.kill();
Expand All @@ -99,7 +105,7 @@ public ApplicationStatus status() {
// since currently we only support single actual remote job, we can get its status without
// building the execution plan
try {
JobConfig jc = new JobConfig(JobPlanner.generateSingleJobConfig(appDesc.getConfig()));
JobConfig jc = new JobConfig(JobPlanner.generateSingleJobConfig(config));
return getApplicationStatus(jc);
} catch (Throwable t) {
throw new SamzaException("Failed to get status for application", t);
Expand All @@ -113,7 +119,7 @@ public void waitForFinish() {

@Override
public boolean waitForFinish(Duration timeout) {
JobConfig jobConfig = new JobConfig(JobPlanner.generateSingleJobConfig(appDesc.getConfig()));
JobConfig jobConfig = new JobConfig(JobPlanner.generateSingleJobConfig(config));
boolean finished = true;
long timeoutInMs = timeout.toMillis();
long startTimeInMs = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,20 @@ public void testWaitForFinishTimesout() {
assertFalse("Application finished before the timeout.", finished);
}

@Test
public void testRunWithConfigLoaderFactoryPresent() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I guess the code as-is isn't very unit test friendly. It would be nice to test the invocation of .submit vs .run on JobRunner depending on the presence of the ConfigLoaderFactory.

Maybe we could factor out the JobRunner creation into its own method, and use a spy to stub out the return value of that method to be a mock JobRunner that we can verify? Disclaimer though, I'm not sure what the best practices are of using spies...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, I figured that too. In general, I prefer not to mess with production code for the sake of unit tests.

The current unit actually test the new flow, I passed null SamzaApplication to RemoteApplicationRunner's constructor. In RemoteApplicationRunner#run, if it goes the legacy flow, it will throw exception when constructing RemoteJobPlanner where SamzaApplication is required. Do you think this is enough?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some precedence elsewhere in samza code; ZkJobCoordinator.readJobModelFromMetadataStore for example uses a similar method structure + spy in unit test.

You are correct that the test written now does ensure that the legacy flow isn't triggered, but it does little to verify the behavior of the new flow. I won't block the PR on this though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 that there is a bit of missing test coverage here. It would be nice if this class was set up better for improved testing, but I also won't block the PR on this.

Map<String, String> config = new HashMap<>();
config.put(ApplicationConfig.APP_NAME, "test-app");
config.put(JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory");
config.put(JobConfig.STREAM_JOB_FACTORY_CLASS, MockStreamJobFactory.class.getName());
runner = new RemoteApplicationRunner(null, new MapConfig(config));

runner.run(null);
}

@Test
public void testGetStatus() {
Map m = new HashMap<String, String>();
Map<String, String> m = new HashMap<>();
m.put(JobConfig.JOB_NAME, "jobName");
m.put(JobConfig.STREAM_JOB_FACTORY_CLASS, MockStreamJobFactory.class.getName());

Expand All @@ -101,8 +112,8 @@ public MockStreamJobFactory() {
@Override
public StreamJob getJob(final Config config) {

StreamJob streamJob = new StreamJob() {
JobConfig c = (JobConfig) config;
return new StreamJob() {
JobConfig c = new JobConfig(config);

@Override
public StreamJob submit() {
Expand Down Expand Up @@ -137,8 +148,6 @@ public ApplicationStatus getStatus() {
}
}
};

return streamJob;
}
}
}