-
Notifications
You must be signed in to change notification settings - Fork 2
Entry point for Flink REST API using external SDK driver program. #27
Conversation
| return getExecutionEnvironment().execute(jobName); | ||
| try { | ||
| return getExecutionEnvironment().execute(jobName); | ||
| } catch (OptimizerPlanEnvironment.ProgramAbortException ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs to move to FlinkPipelineRunner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that would be the better place.
| return getExecutionEnvironment().execute(jobName); | ||
| try { | ||
| return getExecutionEnvironment().execute(jobName); | ||
| } catch (OptimizerPlanEnvironment.ProgramAbortException ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that would be the better place.
runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkPipelineRunner.java
Outdated
Show resolved
Hide resolved
|
|
||
| // TODO: check for job service ready | ||
| Thread.sleep(5000); | ||
| success = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be addressed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could use something like the option retrieval to check for the lifeness of the JobService.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise I would remove these two lines here and just process the stdout below.
| throw new IllegalStateException("Job service thread is not alive"); | ||
| } | ||
| } finally { | ||
| System.setErr(oldErr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thread needs to be cleaned up here in case of errors which could be caught done in the stack and would let the thread lingering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could also use the ProcessManager here?
|
|
||
| @VisibleForTesting | ||
| Process getUnderlyingProcess() { | ||
| public Process getUnderlyingProcess() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This leaks the ProcessManager. Could we instead add the required methods for this here?
|
|
||
| private void runDriverProgram() throws Exception { | ||
| ProcessManager processManager = ProcessManager.create(); | ||
| String executable = "bash"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why bash? Maybe sh?
| ImmutableList.of("-c", String.format("exec %s " + DRIVER_CMD_FLAGS, driverCmd, jobPort)); | ||
| String processId = "client1"; | ||
|
|
||
| Duration timeout = Duration.ofSeconds(30); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe make this configurable?
|
|
||
| private final String driverCmd; | ||
| private FlinkJobServerDriver driver = null; | ||
| private Thread driverThread = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private Thread driverThread = null; | |
| private Thread driverThread; |
| private static String DRIVER_CMD_FLAGS = "--job_endpoint=localhost:%s"; | ||
|
|
||
| private final String driverCmd; | ||
| private FlinkJobServerDriver driver = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private FlinkJobServerDriver driver = null; | |
| private FlinkJobServerDriver driver; |
| // convey to the environment that the job was successfully constructed | ||
| throw new OptimizerPlanEnvironment.ProgramAbortException(); | ||
| } | ||
| throw new RuntimeException("Driver program failed."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| throw new RuntimeException("Driver program failed."); | |
| throw new RuntimeException("Driver program failed with exit value " + driverProcess.exitValue()); |
Printing the error output could also be useful.
…FlinkPipelineRunner.java Co-Authored-By: Maximilian Michels <mxm@apache.org>
| * | ||
| * <p>Finally Flink launches the job. | ||
| */ | ||
| public class LyftFlinkPipelineRunner { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mxm I would like to eventually add this class upstream. As for a suitable name, maybe FlinkPortableClientRunner, since it runs the SDK client to construct the pipeline?
|
Comments addressed. The execution handover is much simpler now, after few changes/cleanup in surrounding classes. |
|
Upstream ticket: https://issues.apache.org/jira/browse/BEAM-8471 |
|
upstream: apache#9872 |
709fb8a to
0b3082d
Compare
This is the more flexible approach of launching pipelines via the Flink REST API. Instead of running the driver program during build time and baking the pipeline into a Flink fat jar, the pipeline is constructed at job submission time through integration with the
OptimizerPlanEnvironment. More details in the javadoc.