From 3b9091dc699244ce14b4334e23e1a15e77b41bca Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Wed, 5 Nov 2025 14:50:38 +0200 Subject: [PATCH 1/4] Fix Flink IllegalThreadStateException on Java 8 --- .../FlinkPipelineExecutionEnvironment.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 029eff25a825..c84d206b87d0 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -20,6 +20,8 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.beam.runners.core.metrics.MetricsPusher; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -52,6 +54,8 @@ class FlinkPipelineExecutionEnvironment { private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class); + private static final Set protectedThreadGroups = ConcurrentHashMap.newKeySet(); + private final FlinkPipelineOptions options; /** @@ -143,6 +147,7 @@ public PipelineResult executePipeline() throws Exception { if (flinkBatchEnv != null) { if (options.getAttachedMode()) { JobExecutionResult jobExecutionResult = flinkBatchEnv.execute(jobName); + ensureFlinkCleanupComplete(flinkBatchEnv); return createAttachedPipelineResult(jobExecutionResult); } else { JobClient jobClient = flinkBatchEnv.executeAsync(jobName); @@ -151,6 +156,7 @@ public PipelineResult executePipeline() throws Exception { } else if (flinkStreamEnv != null) { if (options.getAttachedMode()) { JobExecutionResult jobExecutionResult = flinkStreamEnv.execute(jobName); + ensureFlinkCleanupComplete(flinkStreamEnv); return createAttachedPipelineResult(jobExecutionResult); } else { JobClient jobClient = flinkStreamEnv.executeAsync(jobName); @@ -161,6 +167,35 @@ public PipelineResult executePipeline() throws Exception { } } + /** + * Prevents ThreadGroup destruction while Flink cleanup threads are still running. + */ + private void ensureFlinkCleanupComplete(Object executionEnv) { + String javaVersion = System.getProperty("java.version"); + if (javaVersion == null || !javaVersion.startsWith("1.8")) { + return; + } + + ThreadGroup currentThreadGroup = Thread.currentThread().getThreadGroup(); + if (currentThreadGroup == null) { + return; + } + + protectedThreadGroups.add(currentThreadGroup); + + Thread cleanupReleaser = new Thread(() -> { + try { + Thread.sleep(2000); // 2 seconds should be enough for Flink cleanup + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + protectedThreadGroups.remove(currentThreadGroup); + } + }, "FlinkCleanupReleaser"); + cleanupReleaser.setDaemon(true); + cleanupReleaser.start(); + } + private FlinkDetachedRunnerResult createDetachedPipelineResult( JobClient jobClient, FlinkPipelineOptions options) { LOG.info("Pipeline submitted in detached mode"); From 57c07f6c0aa9316382f5cb392f462a0ab0397117 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Wed, 5 Nov 2025 16:32:21 +0200 Subject: [PATCH 2/4] Fixed both Spotless formatting issues --- .../FlinkPipelineExecutionEnvironment.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index c84d206b87d0..09e0a159d380 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -167,9 +167,7 @@ public PipelineResult executePipeline() throws Exception { } } - /** - * Prevents ThreadGroup destruction while Flink cleanup threads are still running. - */ + /** Prevents ThreadGroup destruction while Flink cleanup threads are still running. */ private void ensureFlinkCleanupComplete(Object executionEnv) { String javaVersion = System.getProperty("java.version"); if (javaVersion == null || !javaVersion.startsWith("1.8")) { @@ -183,15 +181,18 @@ private void ensureFlinkCleanupComplete(Object executionEnv) { protectedThreadGroups.add(currentThreadGroup); - Thread cleanupReleaser = new Thread(() -> { - try { - Thread.sleep(2000); // 2 seconds should be enough for Flink cleanup - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - protectedThreadGroups.remove(currentThreadGroup); - } - }, "FlinkCleanupReleaser"); + Thread cleanupReleaser = + new Thread( + () -> { + try { + Thread.sleep(2000); // 2 seconds should be enough for Flink cleanup + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + protectedThreadGroups.remove(currentThreadGroup); + } + }, + "FlinkCleanupReleaser"); cleanupReleaser.setDaemon(true); cleanupReleaser.start(); } From 1860c25b5ed33533e0ad6983e51f4b59fbbbb8cd Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Wed, 5 Nov 2025 20:19:50 +0200 Subject: [PATCH 3/4] Applied Flink ThreadGroup fix to Java 8 batch jobs in local environments only --- .../flink/FlinkPipelineExecutionEnvironment.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 09e0a159d380..4e37c65c1ff2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -147,7 +147,7 @@ public PipelineResult executePipeline() throws Exception { if (flinkBatchEnv != null) { if (options.getAttachedMode()) { JobExecutionResult jobExecutionResult = flinkBatchEnv.execute(jobName); - ensureFlinkCleanupComplete(flinkBatchEnv); + ensureFlinkCleanupComplete(); return createAttachedPipelineResult(jobExecutionResult); } else { JobClient jobClient = flinkBatchEnv.executeAsync(jobName); @@ -156,7 +156,7 @@ public PipelineResult executePipeline() throws Exception { } else if (flinkStreamEnv != null) { if (options.getAttachedMode()) { JobExecutionResult jobExecutionResult = flinkStreamEnv.execute(jobName); - ensureFlinkCleanupComplete(flinkStreamEnv); + ensureFlinkCleanupComplete(); return createAttachedPipelineResult(jobExecutionResult); } else { JobClient jobClient = flinkStreamEnv.executeAsync(jobName); @@ -168,12 +168,21 @@ public PipelineResult executePipeline() throws Exception { } /** Prevents ThreadGroup destruction while Flink cleanup threads are still running. */ - private void ensureFlinkCleanupComplete(Object executionEnv) { + private void ensureFlinkCleanupComplete() { String javaVersion = System.getProperty("java.version"); if (javaVersion == null || !javaVersion.startsWith("1.8")) { return; } + if (flinkBatchEnv == null) { + return; + } + + String flinkMaster = options.getFlinkMaster(); + if (!flinkMaster.matches("\\[auto\\]|\\[collection\\]|\\[local\\]")) { + return; + } + ThreadGroup currentThreadGroup = Thread.currentThread().getThreadGroup(); if (currentThreadGroup == null) { return; From 77c0bc6d4a36d74c554f215079f57ff90b7ab11f Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Wed, 5 Nov 2025 20:57:34 +0200 Subject: [PATCH 4/4] Added executionEnv param --- .../flink/FlinkPipelineExecutionEnvironment.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 4e37c65c1ff2..973aa6c24298 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -31,8 +31,10 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.LocalEnvironment; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraph; import org.slf4j.Logger; @@ -147,7 +149,7 @@ public PipelineResult executePipeline() throws Exception { if (flinkBatchEnv != null) { if (options.getAttachedMode()) { JobExecutionResult jobExecutionResult = flinkBatchEnv.execute(jobName); - ensureFlinkCleanupComplete(); + ensureFlinkCleanupComplete(flinkBatchEnv); return createAttachedPipelineResult(jobExecutionResult); } else { JobClient jobClient = flinkBatchEnv.executeAsync(jobName); @@ -156,7 +158,7 @@ public PipelineResult executePipeline() throws Exception { } else if (flinkStreamEnv != null) { if (options.getAttachedMode()) { JobExecutionResult jobExecutionResult = flinkStreamEnv.execute(jobName); - ensureFlinkCleanupComplete(); + ensureFlinkCleanupComplete(flinkStreamEnv); return createAttachedPipelineResult(jobExecutionResult); } else { JobClient jobClient = flinkStreamEnv.executeAsync(jobName); @@ -168,18 +170,14 @@ public PipelineResult executePipeline() throws Exception { } /** Prevents ThreadGroup destruction while Flink cleanup threads are still running. */ - private void ensureFlinkCleanupComplete() { + private void ensureFlinkCleanupComplete(Object executionEnv) { String javaVersion = System.getProperty("java.version"); if (javaVersion == null || !javaVersion.startsWith("1.8")) { return; } - if (flinkBatchEnv == null) { - return; - } - - String flinkMaster = options.getFlinkMaster(); - if (!flinkMaster.matches("\\[auto\\]|\\[collection\\]|\\[local\\]")) { + if (!(executionEnv instanceof LocalStreamEnvironment + || executionEnv instanceof LocalEnvironment)) { return; }