Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
"https://github.com/apache/beam/pull/33322": "noting that PR #33322 should run this test",
"https://github.com/apache/beam/pull/34123": "noting that PR #34123 should run this test",
"https://github.com/apache/beam/pull/34080": "noting that PR #34080 should run this test",
"https://github.com/apache/beam/pull/34155": "noting that PR #34155 should run this test"
"https://github.com/apache/beam/pull/34155": "noting that PR #34155 should run this test",
"https://github.com/apache/beam/pull/34560": "noting that PR #34560 should run this test"
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@
"https://github.com/apache/beam/pull/33267": "noting that PR #33267 should run this test",
"https://github.com/apache/beam/pull/33322": "noting that PR #33322 should run this test",
"https://github.com/apache/beam/pull/34080": "noting that PR #34080 should run this test",
"https://github.com/apache/beam/pull/34155": "noting that PR #34155 should run this test"
"https://github.com/apache/beam/pull/34155": "noting that PR #34155 should run this test",
"https://github.com/apache/beam/pull/34560": "noting that PR #34560 should run this test"
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
## Deprecations

* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)).
* Added support for streaming side-inputs in the Spark Classic runner ([#18136](https://github.com/apache/beam/issues/18136)).
* Beam ZetaSQL is deprecated and will be removed no earlier than Beam 2.68.0 ([#34423](https://github.com/apache/beam/issues/34423)).
Users are recommended to switch to [Calcite SQL](https://beam.apache.org/documentation/dsls/sql/calcite/overview/) dialect.

Expand Down
1 change: 0 additions & 1 deletion runners/spark/spark_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test)
filter {
// UNBOUNDED View.CreatePCollectionView not supported
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$BundleInvariantsTests.testWatermarkUpdateMidBundle'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedSideInputNotPresent'
// TODO(https://github.com/apache/beam/issues/29973)
excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
// TODO(https://github.com/apache/beam/issues/31231
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
package org.apache.beam.runners.spark;

import java.util.List;
import org.apache.beam.runners.spark.translation.streaming.CreateStreamingSparkView;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.construction.PTransformMatchers;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.util.construction.SplittableParDo;
import org.apache.beam.sdk.util.construction.SplittableParDoNaiveBounded;
import org.apache.beam.sdk.util.construction.UnsupportedOverrideFactory;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;

/** {@link PTransform} overrides for Spark runner. */
Expand All @@ -50,6 +53,20 @@ public static List<PTransformOverride> getDefaultOverrides(boolean streaming) {
PTransformOverride.of(
PTransformMatchers.urnEqualTo(PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN),
new SplittableParDoNaiveBounded.OverrideFactory()));
} else {
builder.add(
PTransformOverride.of(
// For streaming pipelines, this override is applied only when the PTransform has the
// same URN
// as PTransformTranslation.CREATE_VIEW_TRANSFORM and at least one of its inputs is
// UNBOUNDED
PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN)
.and(
(AppliedPTransform<?, ?, ?> appliedPTransform) ->
appliedPTransform.getInputs().values().stream()
.anyMatch(
e -> e.isBounded().equals(PCollection.IsBounded.UNBOUNDED))),
CreateStreamingSparkView.Factory.INSTANCE));
}
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -101,6 +102,23 @@ public static <T> List<byte[]> toByteArrays(Iterable<T> values, Coder<T> coder)
return res;
}

/**
* Utility method for serializing a Iterator of values using the specified coder.
*
* @param values Values to serialize.
* @param coder Coder to serialize with.
* @param <T> type of value that is serialized
* @return List of bytes representing serialized objects.
*/
public static <T> List<byte[]> toByteArrays(Iterator<T> values, Coder<T> coder) {
List<byte[]> res = new ArrayList<>();
while (values.hasNext()) {
final T value = values.next();
res.add(toByteArray(value, coder));
}
return res;
}

/**
* Utility method for deserializing a byte array using the specified coder.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class EvaluationContext {
new HashMap<>();
private final PipelineOptions options;
private final SerializablePipelineOptions serializableOptions;
private boolean streamingSideInput = false;

public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, PipelineOptions options) {
this.jsc = jsc;
Expand Down Expand Up @@ -358,4 +359,31 @@ <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
public String storageLevel() {
return serializableOptions.get().as(SparkPipelineOptions.class).getStorageLevel();
}

/**
* Checks if any of the side inputs in the pipeline are streaming side inputs.
*
* <p>If at least one of the side inputs is a streaming side input, this method returns true. When
* streaming side inputs are present, the {@link
* org.apache.beam.runners.spark.util.CachedSideInputReader} will not be used.
*
* @return true if any of the side inputs in the pipeline are streaming side inputs, false
* otherwise
*/
public boolean isStreamingSideInput() {
return streamingSideInput;
}

/**
* Marks that the pipeline contains at least one streaming side input.
*
* <p>When this method is called, it sets the streamingSideInput flag to true, indicating that the
* {@link org.apache.beam.runners.spark.util.CachedSideInputReader} should not be used for
* processing side inputs.
*/
public void useStreamingSideInput() {
if (!this.streamingSideInput) {
this.streamingSideInput = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
import org.apache.beam.runners.spark.util.CachedSideInputReader;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.runners.spark.util.SparkSideInputReader;
import org.apache.beam.runners.spark.util.SideInputReaderFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
Expand Down Expand Up @@ -66,6 +65,7 @@ public class MultiDoFnFunction<InputT, OutputT>
private final MetricsContainerStepMapAccumulator metricsAccum;
private final String stepName;
private final DoFn<InputT, OutputT> doFn;
private final boolean useStreamingSideInput;
private transient boolean wasSetupCalled;
private final SerializablePipelineOptions options;
private final TupleTag<OutputT> mainOutputTag;
Expand Down Expand Up @@ -106,7 +106,8 @@ public MultiDoFnFunction(
boolean stateful,
DoFnSchemaInformation doFnSchemaInformation,
Map<String, PCollectionView<?>> sideInputMapping,
boolean useBoundedConcurrentOutput) {
boolean useBoundedConcurrentOutput,
boolean useStreamingSideInput) {
this.metricsAccum = metricsAccum;
this.stepName = stepName;
this.doFn = SerializableUtils.clone(doFn);
Expand All @@ -121,6 +122,7 @@ public MultiDoFnFunction(
this.doFnSchemaInformation = doFnSchemaInformation;
this.sideInputMapping = sideInputMapping;
this.useBoundedConcurrentOutput = useBoundedConcurrentOutput;
this.useStreamingSideInput = useStreamingSideInput;
}

@Override
Expand Down Expand Up @@ -178,7 +180,7 @@ public TimerInternals timerInternals() {
DoFnRunners.simpleRunner(
options.get(),
doFn,
CachedSideInputReader.of(new SparkSideInputReader(sideInputs)),
SideInputReaderFactory.create(this.useStreamingSideInput, this.sideInputs),
processor.getOutputManager(),
mainOutputTag,
additionalOutputTags,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.translation;

import java.io.Serializable;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.WindowedValue;

/**
* Metadata class for side inputs in Spark runner. Contains serialized data, type information and
* coder for side input processing.
*/
public class SideInputMetadata implements Serializable {
private final byte[] data;
private final SparkPCollectionView.Type type;
private final Coder<Iterable<WindowedValue<?>>> coder;

/**
* Constructor for SideInputMetadata.
*
* @param data The serialized side input data as byte array
* @param type The type of the SparkPCollectionView
* @param coder The coder for iterables of windowed values
*/
SideInputMetadata(
byte[] data, SparkPCollectionView.Type type, Coder<Iterable<WindowedValue<?>>> coder) {
this.data = data;
this.type = type;
this.coder = coder;
}

/**
* Creates a new instance of SideInputMetadata.
*
* @param data The serialized side input data as byte array
* @param type The type of the SparkPCollectionView
* @param coder The coder for iterables of windowed values
* @return A new SideInputMetadata instance
*/
public static SideInputMetadata create(
byte[] data, SparkPCollectionView.Type type, Coder<Iterable<WindowedValue<?>>> coder) {
return new SideInputMetadata(data, type, coder);
}

/**
* Converts this metadata to a {@link SideInputBroadcast} instance.
*
* @return A new {@link SideInputBroadcast} instance created from this metadata
*/
@SuppressWarnings("rawtypes")
public SideInputBroadcast toSideInputBroadcast() {
return SideInputBroadcast.create(this.data, this.type, this.coder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/** SparkPCollectionView is used to pass serialized views to lambdas. */
@SuppressWarnings({
Expand All @@ -41,17 +40,39 @@ public class SparkPCollectionView implements Serializable {
// Holds the view --> broadcast mapping. Transient so it will be null from resume
private transient volatile Map<PCollectionView<?>, SideInputBroadcast> broadcastHelperMap = null;

/** Type of side input. */
public enum Type {
/** for fixed inputs. */
STATIC,
/** for dynamically updated inputs. */
STREAMING
}

// Holds the Actual data of the views in serialize form
private final Map<PCollectionView<?>, Tuple2<byte[], Coder<Iterable<WindowedValue<?>>>>> pviews =
new LinkedHashMap<>();
private final Map<PCollectionView<?>, SideInputMetadata> pviews = new LinkedHashMap<>();

// Driver only - during evaluation stage
void putPView(
public void putPView(
PCollectionView<?> view,
Iterable<WindowedValue<?>> value,
Coder<Iterable<WindowedValue<?>>> coder) {
this.putPView(view, value, coder, Type.STATIC);
}

public void putStreamingPView(
PCollectionView<?> view,
Iterable<WindowedValue<?>> value,
Coder<Iterable<WindowedValue<?>>> coder) {
this.putPView(view, value, coder, Type.STREAMING);
}

// Driver only - during evaluation stage
private void putPView(
PCollectionView<?> view,
Iterable<WindowedValue<?>> value,
Coder<Iterable<WindowedValue<?>>> coder,
Type type) {

pviews.put(view, new Tuple2<>(CoderHelpers.toByteArray(value, coder), coder));
pviews.put(view, SideInputMetadata.create(CoderHelpers.toByteArray(value, coder), type, coder));

// Currently unsynchronized unpersist, if needed can be changed to blocking
if (broadcastHelperMap != null) {
Expand Down Expand Up @@ -90,8 +111,8 @@ SideInputBroadcast getPCollectionView(PCollectionView<?> view, JavaSparkContext

private SideInputBroadcast createBroadcastHelper(
PCollectionView<?> view, JavaSparkContext context) {
Tuple2<byte[], Coder<Iterable<WindowedValue<?>>>> tuple2 = pviews.get(view);
SideInputBroadcast helper = SideInputBroadcast.create(tuple2._1, tuple2._2);
final SideInputMetadata sideInputMetadata = pviews.get(view);
SideInputBroadcast helper = sideInputMetadata.toSideInputBroadcast();
String pCollectionName =
view.getPCollection() != null ? view.getPCollection().getName() : "UNKNOWN";
LOG.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ public void evaluate(
ParDoTranslation.getSideInputMapping(context.getCurrentTransform());

TupleTag<OutputT> mainOutputTag = transform.getMainOutputTag();
final boolean useStreamingSideInput = context.isStreamingSideInput();
MultiDoFnFunction<InputT, OutputT> multiDoFnFunction =
new MultiDoFnFunction<>(
metricsAccum,
Expand All @@ -447,7 +448,8 @@ public void evaluate(
stateful,
doFnSchemaInformation,
sideInputMapping,
useBoundedConcurrentOutput);
useBoundedConcurrentOutput,
useStreamingSideInput);

if (stateful) {
// Based on the fact that the signature is stateful, DoFnSignatures ensures
Expand Down
Loading
Loading