diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 029eff25a825..973aa6c24298 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -20,6 +20,8 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.beam.runners.core.metrics.MetricsPusher; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -29,8 +31,10 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.LocalEnvironment; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraph; import org.slf4j.Logger; @@ -52,6 +56,8 @@ class FlinkPipelineExecutionEnvironment { private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class); + private static final Set protectedThreadGroups = ConcurrentHashMap.newKeySet(); + private final FlinkPipelineOptions options; /** @@ -143,6 +149,7 @@ public PipelineResult executePipeline() throws Exception { if (flinkBatchEnv != null) { if (options.getAttachedMode()) { JobExecutionResult jobExecutionResult = flinkBatchEnv.execute(jobName); + ensureFlinkCleanupComplete(flinkBatchEnv); return createAttachedPipelineResult(jobExecutionResult); } else { JobClient jobClient = flinkBatchEnv.executeAsync(jobName); @@ -151,6 +158,7 @@ public PipelineResult executePipeline() throws Exception { } else if (flinkStreamEnv != null) { if (options.getAttachedMode()) { JobExecutionResult jobExecutionResult = flinkStreamEnv.execute(jobName); + ensureFlinkCleanupComplete(flinkStreamEnv); return createAttachedPipelineResult(jobExecutionResult); } else { JobClient jobClient = flinkStreamEnv.executeAsync(jobName); @@ -161,6 +169,41 @@ public PipelineResult executePipeline() throws Exception { } } + /** Prevents ThreadGroup destruction while Flink cleanup threads are still running. */ + private void ensureFlinkCleanupComplete(Object executionEnv) { + String javaVersion = System.getProperty("java.version"); + if (javaVersion == null || !javaVersion.startsWith("1.8")) { + return; + } + + if (!(executionEnv instanceof LocalStreamEnvironment + || executionEnv instanceof LocalEnvironment)) { + return; + } + + ThreadGroup currentThreadGroup = Thread.currentThread().getThreadGroup(); + if (currentThreadGroup == null) { + return; + } + + protectedThreadGroups.add(currentThreadGroup); + + Thread cleanupReleaser = + new Thread( + () -> { + try { + Thread.sleep(2000); // 2 seconds should be enough for Flink cleanup + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + protectedThreadGroups.remove(currentThreadGroup); + } + }, + "FlinkCleanupReleaser"); + cleanupReleaser.setDaemon(true); + cleanupReleaser.start(); + } + private FlinkDetachedRunnerResult createDetachedPipelineResult( JobClient jobClient, FlinkPipelineOptions options) { LOG.info("Pipeline submitted in detached mode");