Skip to content
Merged
10 changes: 9 additions & 1 deletion runners/flink/job-server/flink_job_server.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ runShadow {
if (project.hasProperty('sdkWorkerParallelism'))
args += ["--sdk-worker-parallelism=${project.property('sdkWorkerParallelism')}"]

logger.info('Will start flink job server with args {}', args)

// Enable remote debugging.
jvmArgs = ["-Xdebug", "-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"]
if (project.hasProperty("logLevel"))
Expand Down Expand Up @@ -228,9 +230,15 @@ def setupTask = project.tasks.register("flinkJobServerSetup", Exec) {
dependsOn shadowJar
def pythonDir = project.project(":sdks:python").projectDir
def flinkJobServerJar = shadowJar.archivePath
def flinkDir = project.project(":runners:flink").projectDir
def additionalArgs = ""
if (project.hasProperty('flinkConfDir'))
additionalArgs += " --flink-conf-dir=${project.property('flinkConfDir')}"
else
additionalArgs += "--flink-conf-dir=$flinkDir/src/test/resources"

executable 'sh'
args '-c', "$pythonDir/scripts/run_job_server.sh stop --group_id ${project.name} && $pythonDir/scripts/run_job_server.sh start --group_id ${project.name} --job_port ${jobPort} --artifact_port ${artifactPort} --job_server_jar ${flinkJobServerJar}"
args '-c', "$pythonDir/scripts/run_job_server.sh stop --group_id ${project.name} && $pythonDir/scripts/run_job_server.sh start --group_id ${project.name} --job_port ${jobPort} --artifact_port ${artifactPort} --job_server_jar ${flinkJobServerJar} --additional_args \"${additionalArgs}\""
}

def cleanupTask = project.tasks.register("flinkJobServerCleanup", Exec) {
Expand Down
7 changes: 6 additions & 1 deletion sdks/python/scripts/run_job_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ while [[ $# -gt 0 ]]; do
shift
shift
;;
--additional_args)
ADDITIONAL_ARGS="$2"
shift
shift
;;
start)
STARTSTOP="$1"
shift
Expand Down Expand Up @@ -97,7 +102,7 @@ case $STARTSTOP in
fi

echo "Launching job server @ $JOB_PORT ..."
java -jar $JOB_SERVER_JAR --job-port=$JOB_PORT --artifact-port=$ARTIFACT_PORT --expansion-port=0 >$TEMP_DIR/$FILE_BASE.log 2>&1 </dev/null &
java -jar $JOB_SERVER_JAR --job-port=$JOB_PORT --artifact-port=$ARTIFACT_PORT --expansion-port=0 $ADDITIONAL_ARGS >$TEMP_DIR/$FILE_BASE.log 2>&1 </dev/null &
mypid=$!
if kill -0 $mypid >/dev/null 2>&1; then
echo $mypid >> $pid
Expand Down