From 160af59cfa6c117bf1ff49328763fa2cfa76cd9a Mon Sep 17 00:00:00 2001 From: Vitaly Terentyev Date: Tue, 11 Mar 2025 22:59:42 +0400 Subject: [PATCH 1/2] Fix pipeline translate for Spark3 --- ...rkStreamingPortablePipelineTranslator.java | 74 +++++++++++++------ 1 file changed, 51 insertions(+), 23 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java index 505a91e03b53..1f558b4b6c39 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java @@ -30,7 +30,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.spark.SparkPipelineOptions; @@ -63,6 +65,7 @@ import org.apache.spark.broadcast.Broadcast; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.dstream.ConstantInputDStream; import scala.Tuple2; import scala.collection.JavaConverters; @@ -157,17 +160,27 @@ private static void translateImpulse( .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder)) .map(CoderHelpers.fromByteFunction(windowCoder)); - final ConstantInputDStream> inputDStream = - new ConstantInputDStream<>( - context.getStreamingContext().ssc(), - emptyByteArrayRDD.rdd(), - JavaSparkContext$.MODULE$.fakeClassTag()); - - final JavaDStream> stream = - JavaDStream.fromDStream(inputDStream, JavaSparkContext$.MODULE$.fakeClassTag()); - - UnboundedDataset output = - new UnboundedDataset<>(stream, Collections.singletonList(inputDStream.id())); + UnboundedDataset output; + if (context.getSparkContext().version().startsWith("3")) { + Queue>> rddQueue = new LinkedBlockingQueue<>(); + rddQueue.offer(emptyByteArrayRDD); + JavaInputDStream> emptyByteArrayStream = + context.getStreamingContext().queueStream(rddQueue, true /* oneAtATime */); + output = + new UnboundedDataset<>( + emptyByteArrayStream, + Collections.singletonList(emptyByteArrayStream.inputDStream().id())); + } else { + final ConstantInputDStream> inputDStream = + new ConstantInputDStream<>( + context.getStreamingContext().ssc(), + emptyByteArrayRDD.rdd(), + JavaSparkContext$.MODULE$.fakeClassTag()); + + final JavaDStream> stream = + JavaDStream.fromDStream(inputDStream, JavaSparkContext$.MODULE$.fakeClassTag()); + output = new UnboundedDataset<>(stream, Collections.singletonList(inputDStream.id())); + } // Add watermark to holder and advance to infinity to ensure future watermarks can be updated GlobalWatermarkHolder.SparkWatermarks sparkWatermark = @@ -307,11 +320,18 @@ private static void translateFlatten( List streamSources = new ArrayList<>(); if (inputsMap.isEmpty()) { - final JavaRDD> emptyRDD = context.getSparkContext().emptyRDD(); - final SingleEmitInputDStream> singleEmitInputDStream = - new SingleEmitInputDStream<>(context.getStreamingContext().ssc(), emptyRDD.rdd()); - unifiedStreams = - JavaDStream.fromDStream(singleEmitInputDStream, JavaSparkContext$.MODULE$.fakeClassTag()); + if (context.getSparkContext().version().startsWith("3")) { + Queue>> q = new LinkedBlockingQueue<>(); + q.offer(context.getSparkContext().emptyRDD()); + unifiedStreams = context.getStreamingContext().queueStream(q); + } else { + final JavaRDD> emptyRDD = context.getSparkContext().emptyRDD(); + final SingleEmitInputDStream> singleEmitInputDStream = + new SingleEmitInputDStream<>(context.getStreamingContext().ssc(), emptyRDD.rdd()); + unifiedStreams = + JavaDStream.fromDStream( + singleEmitInputDStream, JavaSparkContext$.MODULE$.fakeClassTag()); + } } else { List>> dStreams = new ArrayList<>(); for (String inputId : inputsMap.values()) { @@ -322,13 +342,21 @@ private static void translateFlatten( dStreams.add(unboundedDataset.getDStream()); } else { // create a single RDD stream. - final SingleEmitInputDStream> singleEmitInputDStream = - new SingleEmitInputDStream>( - context.getStreamingContext().ssc(), ((BoundedDataset) dataset).getRDD().rdd()); - final JavaDStream> dStream = - JavaDStream.fromDStream( - singleEmitInputDStream, JavaSparkContext$.MODULE$.fakeClassTag()); - + JavaDStream> dStream; + if (context.getSparkContext().version().startsWith("3")) { + Queue>> q = new LinkedBlockingQueue<>(); + q.offer(((BoundedDataset) dataset).getRDD()); + // TODO (https://github.com/apache/beam/issues/20426): this is not recoverable from + // checkpoint! + dStream = context.getStreamingContext().queueStream(q); + } else { + final SingleEmitInputDStream> singleEmitInputDStream = + new SingleEmitInputDStream>( + context.getStreamingContext().ssc(), ((BoundedDataset) dataset).getRDD().rdd()); + dStream = + JavaDStream.fromDStream( + singleEmitInputDStream, JavaSparkContext$.MODULE$.fakeClassTag()); + } dStreams.add(dStream); } } From 47fc49811288a34a490fa90a1e412d21e39e7b27 Mon Sep 17 00:00:00 2001 From: Vitaly Terentyev Date: Wed, 12 Mar 2025 19:45:41 +0400 Subject: [PATCH 2/2] Refactoring --- ...rkStreamingPortablePipelineTranslator.java | 66 +++++-------------- 1 file changed, 16 insertions(+), 50 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java index 1f558b4b6c39..54c102ee2b3f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java @@ -61,12 +61,10 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.JavaSparkContext$; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; -import org.apache.spark.streaming.dstream.ConstantInputDStream; import scala.Tuple2; import scala.collection.JavaConverters; @@ -160,27 +158,14 @@ private static void translateImpulse( .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder)) .map(CoderHelpers.fromByteFunction(windowCoder)); - UnboundedDataset output; - if (context.getSparkContext().version().startsWith("3")) { - Queue>> rddQueue = new LinkedBlockingQueue<>(); - rddQueue.offer(emptyByteArrayRDD); - JavaInputDStream> emptyByteArrayStream = - context.getStreamingContext().queueStream(rddQueue, true /* oneAtATime */); - output = - new UnboundedDataset<>( - emptyByteArrayStream, - Collections.singletonList(emptyByteArrayStream.inputDStream().id())); - } else { - final ConstantInputDStream> inputDStream = - new ConstantInputDStream<>( - context.getStreamingContext().ssc(), - emptyByteArrayRDD.rdd(), - JavaSparkContext$.MODULE$.fakeClassTag()); - - final JavaDStream> stream = - JavaDStream.fromDStream(inputDStream, JavaSparkContext$.MODULE$.fakeClassTag()); - output = new UnboundedDataset<>(stream, Collections.singletonList(inputDStream.id())); - } + Queue>> rddQueue = new LinkedBlockingQueue<>(); + rddQueue.offer(emptyByteArrayRDD); + JavaInputDStream> emptyByteArrayStream = + context.getStreamingContext().queueStream(rddQueue, true /* oneAtATime */); + UnboundedDataset output = + new UnboundedDataset<>( + emptyByteArrayStream, + Collections.singletonList(emptyByteArrayStream.inputDStream().id())); // Add watermark to holder and advance to infinity to ensure future watermarks can be updated GlobalWatermarkHolder.SparkWatermarks sparkWatermark = @@ -320,18 +305,9 @@ private static void translateFlatten( List streamSources = new ArrayList<>(); if (inputsMap.isEmpty()) { - if (context.getSparkContext().version().startsWith("3")) { - Queue>> q = new LinkedBlockingQueue<>(); - q.offer(context.getSparkContext().emptyRDD()); - unifiedStreams = context.getStreamingContext().queueStream(q); - } else { - final JavaRDD> emptyRDD = context.getSparkContext().emptyRDD(); - final SingleEmitInputDStream> singleEmitInputDStream = - new SingleEmitInputDStream<>(context.getStreamingContext().ssc(), emptyRDD.rdd()); - unifiedStreams = - JavaDStream.fromDStream( - singleEmitInputDStream, JavaSparkContext$.MODULE$.fakeClassTag()); - } + Queue>> q = new LinkedBlockingQueue<>(); + q.offer(context.getSparkContext().emptyRDD()); + unifiedStreams = context.getStreamingContext().queueStream(q); } else { List>> dStreams = new ArrayList<>(); for (String inputId : inputsMap.values()) { @@ -342,21 +318,11 @@ private static void translateFlatten( dStreams.add(unboundedDataset.getDStream()); } else { // create a single RDD stream. - JavaDStream> dStream; - if (context.getSparkContext().version().startsWith("3")) { - Queue>> q = new LinkedBlockingQueue<>(); - q.offer(((BoundedDataset) dataset).getRDD()); - // TODO (https://github.com/apache/beam/issues/20426): this is not recoverable from - // checkpoint! - dStream = context.getStreamingContext().queueStream(q); - } else { - final SingleEmitInputDStream> singleEmitInputDStream = - new SingleEmitInputDStream>( - context.getStreamingContext().ssc(), ((BoundedDataset) dataset).getRDD().rdd()); - dStream = - JavaDStream.fromDStream( - singleEmitInputDStream, JavaSparkContext$.MODULE$.fakeClassTag()); - } + Queue>> q = new LinkedBlockingQueue<>(); + q.offer(((BoundedDataset) dataset).getRDD()); + // TODO (https://github.com/apache/beam/issues/20426): this is not recoverable from + // checkpoint! + JavaDStream> dStream = context.getStreamingContext().queueStream(q); dStreams.add(dStream); } }