Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.collect.ImmutableMap;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Map;
import java.util.Optional;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
Expand Down Expand Up @@ -131,27 +132,39 @@ private static boolean canFuseParDo(
// The PCollection's producer and this ParDo execute in different environments, so fusion
// is never possible.
return false;
} else if (!pipeline.getSideInputs(parDo).isEmpty()) {
// At execution time, a Runner is required to only provide inputs to a PTransform that, at the
// time the PTransform processes them, the associated window is ready in all side inputs that
// the PTransform consumes. For an arbitrary stage, it is significantly complex for the runner
// to determine this for each input. As a result, we break fusion to simplify this inspection.
// In general, a ParDo which consumes side inputs cannot be fused into an executable subgraph
// alongside any transforms which are upstream of any of its side inputs.
}
if (!pipeline.getSideInputs(parDo).isEmpty()) {
// At execution time, a Runner is required to only provide inputs to a PTransform that, at
// the time the PTransform processes them, the associated window is ready in all side inputs
// that the PTransform consumes. For an arbitrary stage, it is significantly complex for the
// runner to determine this for each input. As a result, we break fusion to simplify this
// inspection. In general, a ParDo which consumes side inputs cannot be fused into an
// executable stage alongside any transforms which are upstream of any of its side inputs.
return false;
} else {
try {
ParDoPayload payload =
ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) {
// Inputs to a ParDo that uses State or Timers must be key-partitioned, and elements for
// a key must execute serially. To avoid checking if the rest of the stage is
// key-partitioned and preserves keys, these ParDos do not fuse into an existing stage.
return false;
}
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException(e);
}
}
return true;
}

private static boolean parDoCompatibility(
PTransformNode parDo, PTransformNode other, QueryablePipeline pipeline) {
if (!pipeline.getSideInputs(parDo).isEmpty()) {
// This is a convenience rather than a strict requirement. In general, a ParDo that consumes
// side inputs can be fused with other transforms in the same environment which are not
// upstream of any of the side inputs.
return false;
}
return compatibleEnvironments(parDo, other, pipeline);
// This is a convenience rather than a strict requirement. In general, a ParDo that consumes
// side inputs can be fused with other transforms in the same environment which are not
// upstream of any of the side inputs.
return pipeline.getSideInputs(parDo).isEmpty()
&& compatibleEnvironments(parDo, other, pipeline);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
Expand Down Expand Up @@ -241,6 +243,126 @@ public void fusesCompatibleEnvironments() {
subgraph.toPTransform().getSubtransformsList(), containsInAnyOrder("parDo", "window"));
}

@Test
public void materializesWithStatefulConsumer() {
// (impulse.out) -> parDo -> (parDo.out)
// (parDo.out) -> stateful -> stateful.out
// stateful has a state spec which prevents it from fusing with an upstream ParDo
PTransform parDoTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "parDo.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
PTransform statefulTransform =
PTransform.newBuilder()
.putInputs("input", "parDo.out")
.putOutputs("output", "stateful.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.putStateSpecs("state", StateSpec.getDefaultInstance())
.build()
.toByteString()))
.build();

QueryablePipeline p =
QueryablePipeline.fromComponents(
partialComponents
.toBuilder()
.putTransforms("parDo", parDoTransform)
.putPcollections(
"parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build())
.putTransforms("stateful", statefulTransform)
.putPcollections(
"stateful.out", PCollection.newBuilder().setUniqueName("stateful.out").build())
.putEnvironments("common", Environment.newBuilder().setUrl("common").build())
.build());

ExecutableStage subgraph =
GreedilyFusedExecutableStage.forGrpcPortRead(
p,
impulseOutputNode,
ImmutableSet.of(PipelineNode.pTransform("parDo", parDoTransform)));
assertThat(
subgraph.getOutputPCollections(),
contains(
PipelineNode.pCollection(
"parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build())));
assertThat(
subgraph.toPTransform().getSubtransformsList(), containsInAnyOrder("parDo"));
}

@Test
public void materializesWithConsumerWithTimer() {
// (impulse.out) -> parDo -> (parDo.out)
// (parDo.out) -> timer -> timer.out
// timer has a state spec which prevents it from fusing with an upstream ParDo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// timer has a timer spec

PTransform parDoTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "parDo.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
PTransform timerTransform =
PTransform.newBuilder()
.putInputs("input", "parDo.out")
.putOutputs("output", "timer.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.putTimerSpecs("timer", TimerSpec.getDefaultInstance())
.build()
.toByteString()))
.build();

QueryablePipeline p =
QueryablePipeline.fromComponents(
partialComponents
.toBuilder()
.putTransforms("parDo", parDoTransform)
.putPcollections(
"parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build())
.putTransforms("timer", timerTransform)
.putPcollections(
"timer.out", PCollection.newBuilder().setUniqueName("timer.out").build())
.putEnvironments("common", Environment.newBuilder().setUrl("common").build())
.build());

ExecutableStage subgraph =
GreedilyFusedExecutableStage.forGrpcPortRead(
p,
impulseOutputNode,
ImmutableSet.of(PipelineNode.pTransform("parDo", parDoTransform)));
assertThat(
subgraph.getOutputPCollections(),
contains(
PipelineNode.pCollection(
"parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build())));
assertThat(
subgraph.toPTransform().getSubtransformsList(), containsInAnyOrder("parDo"));
}

@Test
public void fusesFlatten() {
// (impulse.out) -> parDo -> parDo.out --> flatten -> flatten.out -> window -> window.out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.junit.Before;
Expand Down Expand Up @@ -765,6 +767,140 @@ public void sideInputRootsNewStage() {
.withTransforms("sideRead")));
}

/*
* impulse -> .out -> parDo -> .out -> stateful -> .out
* becomes
* (impulse.out) -> parDo -> (parDo.out)
* (parDo.out) -> stateful
*/
@Test
public void statefulParDoRootsStage() {
// (impulse.out) -> parDo -> (parDo.out)
// (parDo.out) -> stateful -> stateful.out
// stateful has a state spec which prevents it from fusing with an upstream ParDo
PTransform parDoTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "parDo.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
PTransform statefulTransform =
PTransform.newBuilder()
.putInputs("input", "parDo.out")
.putOutputs("output", "stateful.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.putStateSpecs("state", StateSpec.getDefaultInstance())
.build()
.toByteString()))
.build();

Components components =
partialComponents
.toBuilder()
.putTransforms("parDo", parDoTransform)
.putPcollections(
"parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build())
.putTransforms("stateful", statefulTransform)
.putPcollections(
"stateful.out", PCollection.newBuilder().setUniqueName("stateful.out").build())
.putEnvironments("common", Environment.newBuilder().setUrl("common").build())
.build();
FusedPipeline fused =
GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build());

assertThat(
fused.getRunnerExecutedTransforms(),
containsInAnyOrder(
PipelineNode.pTransform("impulse", components.getTransformsOrThrow("impulse"))));
assertThat(
fused.getFusedStages(),
containsInAnyOrder(
ExecutableStageMatcher.withInput("impulse.out")
.withOutputs("parDo.out")
.withTransforms("parDo"),
ExecutableStageMatcher.withInput("parDo.out")
.withNoOutputs()
.withTransforms("stateful")));
}

/*
* impulse -> .out -> parDo -> .out -> timer -> .out
* becomes
* (impulse.out) -> parDo -> (parDo.out)
* (parDo.out) -> timer
*/
@Test
public void parDoWithTimerRootsStage() {
PTransform parDoTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "parDo.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
PTransform timerTransform =
PTransform.newBuilder()
.putInputs("input", "parDo.out")
.putOutputs("output", "timer.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.putTimerSpecs("timer", TimerSpec.getDefaultInstance())
.build()
.toByteString()))
.build();

Components components =
partialComponents
.toBuilder()
.putTransforms("parDo", parDoTransform)
.putPcollections(
"parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build())
.putTransforms("timer", timerTransform)
.putPcollections(
"timer.out", PCollection.newBuilder().setUniqueName("timer.out").build())
.putEnvironments("common", Environment.newBuilder().setUrl("common").build())
.build();

FusedPipeline fused =
GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build());

assertThat(
fused.getRunnerExecutedTransforms(),
containsInAnyOrder(
PipelineNode.pTransform("impulse", components.getTransformsOrThrow("impulse"))));
assertThat(
fused.getFusedStages(),
containsInAnyOrder(
ExecutableStageMatcher.withInput("impulse.out")
.withOutputs("parDo.out")
.withTransforms("parDo"),
ExecutableStageMatcher.withInput("parDo.out")
.withNoOutputs()
.withTransforms("timer")));
}

/*
* impulse -> .out -> ( read -> .out --> goTransform -> .out )
* \
Expand Down