From aaeabf3d95e03f988f15a013c26ffdc70f08920e Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 17 Nov 2022 10:24:56 -0500 Subject: [PATCH 01/15] WIP: Attempt to fix flink XVR tests --- .../job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy | 1 + 1 file changed, 1 insertion(+) diff --git a/.test-infra/jenkins/job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy b/.test-infra/jenkins/job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy index 828b51bf5c1e..10d6e60ef097 100644 --- a/.test-infra/jenkins/job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy +++ b/.test-infra/jenkins/job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy @@ -44,6 +44,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_XVR_Flink', tasks(":runners:flink:${CommonTestProperties.getFlinkVersion()}:job-server:validatesCrossLanguageRunner") commonJobProperties.setGradleSwitches(delegate) switches("-PpythonVersion=${pythonVersion}") + switches("-PflinkConfDir=$WORKSPACE/src/runners/flink/src/test/resources") } } } From ad5f94be1d1d20e2a9112a12c71726fd96a2927e Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 17 Nov 2022 10:49:58 -0500 Subject: [PATCH 02/15] Add some debugging info --- runners/flink/job-server/flink_job_server.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index b77e086a5325..6c76a70e3532 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -118,6 +118,8 @@ runShadow { if (project.hasProperty('sdkWorkerParallelism')) args += ["--sdk-worker-parallelism=${project.property('sdkWorkerParallelism')}"] + logging.info('Will start flink job server with args {}', args) + // Enable remote debugging. jvmArgs = ["-Xdebug", "-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"] if (project.hasProperty("logLevel")) From e205ae185c424f82773018dd55dbd541c4740962 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 17 Nov 2022 11:05:45 -0500 Subject: [PATCH 03/15] Fix logging --- runners/flink/job-server/flink_job_server.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 6c76a70e3532..333abee1229e 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -118,7 +118,7 @@ runShadow { if (project.hasProperty('sdkWorkerParallelism')) args += ["--sdk-worker-parallelism=${project.property('sdkWorkerParallelism')}"] - logging.info('Will start flink job server with args {}', args) + logger.info('Will start flink job server with args {}', args) // Enable remote debugging. jvmArgs = ["-Xdebug", "-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"] From bc0970020fd5aa1fc2ee6afc0231ce83679d7e76 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 17 Nov 2022 11:23:59 -0500 Subject: [PATCH 04/15] Fix pathing --- .../job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.test-infra/jenkins/job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy b/.test-infra/jenkins/job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy index 10d6e60ef097..8ef9eaf7b634 100644 --- a/.test-infra/jenkins/job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy +++ b/.test-infra/jenkins/job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy @@ -44,7 +44,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_XVR_Flink', tasks(":runners:flink:${CommonTestProperties.getFlinkVersion()}:job-server:validatesCrossLanguageRunner") commonJobProperties.setGradleSwitches(delegate) switches("-PpythonVersion=${pythonVersion}") - switches("-PflinkConfDir=$WORKSPACE/src/runners/flink/src/test/resources") + switches("-PflinkConfDir=$WORKSPACE/runners/flink/src/test/resources") } } } From 0d67ef1b1628d017c1e2b4f6c33fe75cec5aa4f9 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 17 Nov 2022 13:14:33 -0500 Subject: [PATCH 05/15] Disable caching for now --- .../job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy | 1 + 1 file changed, 1 insertion(+) diff --git a/.test-infra/jenkins/job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy b/.test-infra/jenkins/job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy index 8ef9eaf7b634..091387156d1d 100644 --- a/.test-infra/jenkins/job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy +++ b/.test-infra/jenkins/job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy @@ -45,6 +45,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_XVR_Flink', commonJobProperties.setGradleSwitches(delegate) switches("-PpythonVersion=${pythonVersion}") switches("-PflinkConfDir=$WORKSPACE/runners/flink/src/test/resources") + switches("--rerun-tasks") } } } From 44291392865dcb385b03f7395483d8b7729e7b0b Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 17 Nov 2022 14:20:47 -0500 Subject: [PATCH 06/15] Try manually setting flink-conf-dir --- runners/flink/job-server/flink_job_server.gradle | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 333abee1229e..731e93815c0b 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -113,8 +113,9 @@ runShadow { args += ["--flink-master=${project.property('flinkMaster')}"] else if (project.hasProperty('flinkMasterUrl')) args += ["--flink-master=${project.property('flinkMasterUrl')}"] - if (project.hasProperty('flinkConfDir')) - args += ["--flink-conf-dir=${project.property('flinkConfDir')}"] + // if (project.hasProperty('flinkConfDir')) + // args += ["--flink-conf-dir=${project.property('flinkConfDir')}"] + args += ["--flink-conf-dir=/home/jenkins/jenkins-slave/workspace/beam_SeedJob/runners/flink/src/test/resources"] if (project.hasProperty('sdkWorkerParallelism')) args += ["--sdk-worker-parallelism=${project.property('sdkWorkerParallelism')}"] From 43dc0354543bce486825679a668c4f784f12ce16 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 17 Nov 2022 15:55:33 -0500 Subject: [PATCH 07/15] Try manually setting flink-conf-dir --- runners/flink/job-server/flink_job_server.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 731e93815c0b..8a6ae421b6d3 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -115,7 +115,7 @@ runShadow { args += ["--flink-master=${project.property('flinkMasterUrl')}"] // if (project.hasProperty('flinkConfDir')) // args += ["--flink-conf-dir=${project.property('flinkConfDir')}"] - args += ["--flink-conf-dir=/home/jenkins/jenkins-slave/workspace/beam_SeedJob/runners/flink/src/test/resources"] + args += ["flink-conf-dir=/home/jenkins/jenkins-slave/workspace/beam_SeedJob/runners/flink/src/test/resources"] if (project.hasProperty('sdkWorkerParallelism')) args += ["--sdk-worker-parallelism=${project.property('sdkWorkerParallelism')}"] From ba6c8722ed5d873ad2229d84aa13d6a03bbb28ce Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 17 Nov 2022 16:22:15 -0500 Subject: [PATCH 08/15] Try setting it in run_job_server --- sdks/python/scripts/run_job_server.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/scripts/run_job_server.sh b/sdks/python/scripts/run_job_server.sh index 8f4a224e84e7..88f684ac7ecc 100755 --- a/sdks/python/scripts/run_job_server.sh +++ b/sdks/python/scripts/run_job_server.sh @@ -97,7 +97,7 @@ case $STARTSTOP in fi echo "Launching job server @ $JOB_PORT ..." - java -jar $JOB_SERVER_JAR --job-port=$JOB_PORT --artifact-port=$ARTIFACT_PORT --expansion-port=0 >$TEMP_DIR/$FILE_BASE.log 2>&1 $TEMP_DIR/$FILE_BASE.log 2>&1 /dev/null 2>&1; then echo $mypid >> $pid From fdb45e9824c6b4eaecf549444f9de0b68797102f Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 17 Nov 2022 17:06:05 -0500 Subject: [PATCH 09/15] Clean up method of passing extra args --- runners/flink/job-server/flink_job_server.gradle | 10 ++++++---- sdks/python/scripts/run_job_server.sh | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 8a6ae421b6d3..f938e27d7a89 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -113,9 +113,8 @@ runShadow { args += ["--flink-master=${project.property('flinkMaster')}"] else if (project.hasProperty('flinkMasterUrl')) args += ["--flink-master=${project.property('flinkMasterUrl')}"] - // if (project.hasProperty('flinkConfDir')) - // args += ["--flink-conf-dir=${project.property('flinkConfDir')}"] - args += ["flink-conf-dir=/home/jenkins/jenkins-slave/workspace/beam_SeedJob/runners/flink/src/test/resources"] + if (project.hasProperty('flinkConfDir')) + args += ["--flink-conf-dir=${project.property('flinkConfDir')}"] if (project.hasProperty('sdkWorkerParallelism')) args += ["--sdk-worker-parallelism=${project.property('sdkWorkerParallelism')}"] @@ -231,9 +230,12 @@ def setupTask = project.tasks.register("flinkJobServerSetup", Exec) { dependsOn shadowJar def pythonDir = project.project(":sdks:python").projectDir def flinkJobServerJar = shadowJar.archivePath + def additionalArgs = "" + if (project.hasProperty('flinkConfDir')) + additionalArgs += " --flink-conf-dir=${project.property('flinkConfDir')}" executable 'sh' - args '-c', "$pythonDir/scripts/run_job_server.sh stop --group_id ${project.name} && $pythonDir/scripts/run_job_server.sh start --group_id ${project.name} --job_port ${jobPort} --artifact_port ${artifactPort} --job_server_jar ${flinkJobServerJar}" + args '-c', "$pythonDir/scripts/run_job_server.sh stop --group_id ${project.name} && $pythonDir/scripts/run_job_server.sh start --group_id ${project.name} --job_port ${jobPort} --artifact_port ${artifactPort} --job_server_jar ${flinkJobServerJar} --additional_args ${additionalArgs}" } def cleanupTask = project.tasks.register("flinkJobServerCleanup", Exec) { diff --git a/sdks/python/scripts/run_job_server.sh b/sdks/python/scripts/run_job_server.sh index 88f684ac7ecc..93e10aa9a745 100755 --- a/sdks/python/scripts/run_job_server.sh +++ b/sdks/python/scripts/run_job_server.sh @@ -97,7 +97,7 @@ case $STARTSTOP in fi echo "Launching job server @ $JOB_PORT ..." - java -jar $JOB_SERVER_JAR --job-port=$JOB_PORT --artifact-port=$ARTIFACT_PORT --expansion-port=0 --flink-conf-dir=/home/jenkins/jenkins-slave/workspace/beam_SeedJob/runners/flink/src/test/resources >$TEMP_DIR/$FILE_BASE.log 2>&1 $TEMP_DIR/$FILE_BASE.log 2>&1 /dev/null 2>&1; then echo $mypid >> $pid From 59520ac806a1685b670561d4f087a805ac3ccd12 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 17 Nov 2022 18:03:19 -0500 Subject: [PATCH 10/15] Accept additional args in script --- sdks/python/scripts/run_job_server.sh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/python/scripts/run_job_server.sh b/sdks/python/scripts/run_job_server.sh index 93e10aa9a745..03e9244d4340 100755 --- a/sdks/python/scripts/run_job_server.sh +++ b/sdks/python/scripts/run_job_server.sh @@ -51,6 +51,11 @@ while [[ $# -gt 0 ]]; do shift shift ;; + --additional_args) + ADDITIONAL_ARGS="$2" + shift + shift + ;; start) STARTSTOP="$1" shift From 5382b02b539817f1effa2c31f4cfe7112dc62454 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 17 Nov 2022 19:23:08 -0500 Subject: [PATCH 11/15] Try more explicit route --- runners/flink/job-server/flink_job_server.gradle | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index f938e27d7a89..0781d3948f31 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -230,9 +230,9 @@ def setupTask = project.tasks.register("flinkJobServerSetup", Exec) { dependsOn shadowJar def pythonDir = project.project(":sdks:python").projectDir def flinkJobServerJar = shadowJar.archivePath - def additionalArgs = "" - if (project.hasProperty('flinkConfDir')) - additionalArgs += " --flink-conf-dir=${project.property('flinkConfDir')}" + def additionalArgs = "/home/jenkins/jenkins-slave/workspace/beam_SeedJob/runners/flink/src/test/resources" + // if (project.hasProperty('flinkConfDir')) + // additionalArgs += " --flink-conf-dir=${project.property('flinkConfDir')}" executable 'sh' args '-c', "$pythonDir/scripts/run_job_server.sh stop --group_id ${project.name} && $pythonDir/scripts/run_job_server.sh start --group_id ${project.name} --job_port ${jobPort} --artifact_port ${artifactPort} --job_server_jar ${flinkJobServerJar} --additional_args ${additionalArgs}" From 3bd53339243c2ec8bb62aca55e7110fc4ea5a126 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 17 Nov 2022 21:47:48 -0500 Subject: [PATCH 12/15] Try more explicit route --- runners/flink/job-server/flink_job_server.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 0781d3948f31..73319018fe52 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -230,7 +230,7 @@ def setupTask = project.tasks.register("flinkJobServerSetup", Exec) { dependsOn shadowJar def pythonDir = project.project(":sdks:python").projectDir def flinkJobServerJar = shadowJar.archivePath - def additionalArgs = "/home/jenkins/jenkins-slave/workspace/beam_SeedJob/runners/flink/src/test/resources" + def additionalArgs = "--flink-conf-dir=/home/jenkins/jenkins-slave/workspace/beam_SeedJob/runners/flink/src/test/resources" // if (project.hasProperty('flinkConfDir')) // additionalArgs += " --flink-conf-dir=${project.property('flinkConfDir')}" From d251020dc2a3868bb841d92b3f75be5667b90db0 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 18 Nov 2022 06:49:52 -0500 Subject: [PATCH 13/15] Try to prove this works --- sdks/python/scripts/run_job_server.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/scripts/run_job_server.sh b/sdks/python/scripts/run_job_server.sh index 03e9244d4340..3af93cd4c473 100755 --- a/sdks/python/scripts/run_job_server.sh +++ b/sdks/python/scripts/run_job_server.sh @@ -102,7 +102,7 @@ case $STARTSTOP in fi echo "Launching job server @ $JOB_PORT ..." - java -jar $JOB_SERVER_JAR --job-port=$JOB_PORT --artifact-port=$ARTIFACT_PORT --expansion-port=0 $ADDITIONAL_ARGS >$TEMP_DIR/$FILE_BASE.log 2>&1 $TEMP_DIR/$FILE_BASE.log 2>&1 /dev/null 2>&1; then echo $mypid >> $pid From aab44ecbfb10dcb9a34e4acaadb4af96d303605c Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 18 Nov 2022 08:32:13 -0500 Subject: [PATCH 14/15] A little indirection --- runners/flink/job-server/flink_job_server.gradle | 2 +- sdks/python/scripts/run_job_server.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 73319018fe52..4acdebc6b2a8 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -235,7 +235,7 @@ def setupTask = project.tasks.register("flinkJobServerSetup", Exec) { // additionalArgs += " --flink-conf-dir=${project.property('flinkConfDir')}" executable 'sh' - args '-c', "$pythonDir/scripts/run_job_server.sh stop --group_id ${project.name} && $pythonDir/scripts/run_job_server.sh start --group_id ${project.name} --job_port ${jobPort} --artifact_port ${artifactPort} --job_server_jar ${flinkJobServerJar} --additional_args ${additionalArgs}" + args '-c', "$pythonDir/scripts/run_job_server.sh stop --group_id ${project.name} && $pythonDir/scripts/run_job_server.sh start --group_id ${project.name} --job_port ${jobPort} --artifact_port ${artifactPort} --job_server_jar ${flinkJobServerJar} --additional_args \"${additionalArgs}\"" } def cleanupTask = project.tasks.register("flinkJobServerCleanup", Exec) { diff --git a/sdks/python/scripts/run_job_server.sh b/sdks/python/scripts/run_job_server.sh index 3af93cd4c473..03e9244d4340 100755 --- a/sdks/python/scripts/run_job_server.sh +++ b/sdks/python/scripts/run_job_server.sh @@ -102,7 +102,7 @@ case $STARTSTOP in fi echo "Launching job server @ $JOB_PORT ..." - java -jar $JOB_SERVER_JAR --job-port=$JOB_PORT --artifact-port=$ARTIFACT_PORT --expansion-port=0 --flink-conf-dir=/home/jenkins/jenkins-slave/workspace/beam_SeedJob/runners/flink/src/test/resources >$TEMP_DIR/$FILE_BASE.log 2>&1 $TEMP_DIR/$FILE_BASE.log 2>&1 /dev/null 2>&1; then echo $mypid >> $pid From 049fabd44f6658653e2a3e947862bf8b67f48d7e Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 18 Nov 2022 09:28:25 -0500 Subject: [PATCH 15/15] Fix pathing --- ..._PostCommit_CrossLanguageValidatesRunner_Flink.groovy | 2 -- runners/flink/job-server/flink_job_server.gradle | 9 ++++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/.test-infra/jenkins/job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy b/.test-infra/jenkins/job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy index 091387156d1d..828b51bf5c1e 100644 --- a/.test-infra/jenkins/job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy +++ b/.test-infra/jenkins/job_PostCommit_CrossLanguageValidatesRunner_Flink.groovy @@ -44,8 +44,6 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_XVR_Flink', tasks(":runners:flink:${CommonTestProperties.getFlinkVersion()}:job-server:validatesCrossLanguageRunner") commonJobProperties.setGradleSwitches(delegate) switches("-PpythonVersion=${pythonVersion}") - switches("-PflinkConfDir=$WORKSPACE/runners/flink/src/test/resources") - switches("--rerun-tasks") } } } diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 4acdebc6b2a8..d6e7a7f961cb 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -230,9 +230,12 @@ def setupTask = project.tasks.register("flinkJobServerSetup", Exec) { dependsOn shadowJar def pythonDir = project.project(":sdks:python").projectDir def flinkJobServerJar = shadowJar.archivePath - def additionalArgs = "--flink-conf-dir=/home/jenkins/jenkins-slave/workspace/beam_SeedJob/runners/flink/src/test/resources" - // if (project.hasProperty('flinkConfDir')) - // additionalArgs += " --flink-conf-dir=${project.property('flinkConfDir')}" + def flinkDir = project.project(":runners:flink").projectDir + def additionalArgs = "" + if (project.hasProperty('flinkConfDir')) + additionalArgs += " --flink-conf-dir=${project.property('flinkConfDir')}" + else + additionalArgs += "--flink-conf-dir=$flinkDir/src/test/resources" executable 'sh' args '-c', "$pythonDir/scripts/run_job_server.sh stop --group_id ${project.name} && $pythonDir/scripts/run_job_server.sh start --group_id ${project.name} --job_port ${jobPort} --artifact_port ${artifactPort} --job_server_jar ${flinkJobServerJar} --additional_args \"${additionalArgs}\""