Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* {@link Pipeline}. This is used to schedule consuming {@link PTransform PTransforms} to consume
* input after the upstream transform has produced and committed output.
*/
public class ConsumerTrackingPipelineVisitor implements PipelineVisitor {
public class ConsumerTrackingPipelineVisitor extends PipelineVisitor.Defaults {
private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers = new HashMap<>();
private Collection<AppliedPTransform<?, ?, ?>> rootTransforms = new ArrayList<>();
private Collection<PCollectionView<?>> views = new ArrayList<>();
Expand All @@ -51,13 +51,14 @@ public class ConsumerTrackingPipelineVisitor implements PipelineVisitor {
private boolean finalized = false;

@Override
public void enterCompositeTransform(TransformTreeNode node) {
public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
checkState(
!finalized,
"Attempting to traverse a pipeline (node %s) with a %s "
+ "which has already visited a Pipeline and is finalized",
node.getFullName(),
ConsumerTrackingPipelineVisitor.class.getSimpleName());
return CompositeBehavior.ENTER_TRANSFORM;
}

@Override
Expand All @@ -73,7 +74,7 @@ public void leaveCompositeTransform(TransformTreeNode node) {
}

@Override
public void visitTransform(TransformTreeNode node) {
public void visitPrimitiveTransform(TransformTreeNode node) {
toFinalize.removeAll(node.getInput().expand());
AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
stepNames.put(appliedTransform, genStepName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ private KeyedPValueTrackingVisitor(
}

@Override
public void enterCompositeTransform(TransformTreeNode node) {
public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
checkState(
!finalized,
"Attempted to use a %s that has already been finalized on a pipeline (visiting node %s)",
KeyedPValueTrackingVisitor.class.getSimpleName(),
node);
return CompositeBehavior.ENTER_TRANSFORM;
}

@Override
Expand All @@ -79,7 +80,7 @@ public void leaveCompositeTransform(TransformTreeNode node) {
}

@Override
public void visitTransform(TransformTreeNode node) {}
public void visitPrimitiveTransform(TransformTreeNode node) {}

@Override
public void visitValue(PValue value, TransformTreeNode producer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {

private int depth = 0;

/**
* Composite transform that we want to translate before proceeding with other transforms.
*/
private PTransform<?, ?> currentCompositeTransform;

public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) {
this.batchContext = new FlinkBatchTranslationContext(env, options);
}
Expand All @@ -57,54 +52,33 @@ public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions op
// --------------------------------------------------------------------------------------------

@Override
public void enterCompositeTransform(TransformTreeNode node) {
public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));

PTransform<?, ?> transform = node.getTransform();
if (transform != null && currentCompositeTransform == null) {

BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
if (translator != null) {
currentCompositeTransform = transform;
if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) {
// we can only optimize CoGroupByKey for input size 2
currentCompositeTransform = null;
}
}
BatchTransformTranslator<?> translator = getTranslator(node);

if (translator != null) {
applyBatchTransform(node.getTransform(), node, translator);
LOG.info(genSpaces(this.depth) + "translated-" + formatNodeName(node));
return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
} else {
this.depth++;
return CompositeBehavior.ENTER_TRANSFORM;
}
this.depth++;
}

@Override
public void leaveCompositeTransform(TransformTreeNode node) {
PTransform<?, ?> transform = node.getTransform();
if (transform != null && currentCompositeTransform == transform) {

BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
if (translator != null) {
LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
applyBatchTransform(transform, node, translator);
currentCompositeTransform = null;
} else {
throw new IllegalStateException("Attempted to translate composite transform " +
"but no translator was found: " + currentCompositeTransform);
}
}
this.depth--;
LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
}

@Override
public void visitTransform(TransformTreeNode node) {
LOG.info(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
if (currentCompositeTransform != null) {
// ignore it
return;
}
public void visitPrimitiveTransform(TransformTreeNode node) {
LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + formatNodeName(node));

// get the transformation corresponding to hte node we are
// get the transformation corresponding to the node we are
// currently visiting and translate it into its Flink alternative.

PTransform<?, ?> transform = node.getTransform();
BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
if (translator == null) {
Expand All @@ -114,11 +88,6 @@ public void visitTransform(TransformTreeNode node) {
applyBatchTransform(transform, node, translator);
}

@Override
public void visitValue(PValue value, TransformTreeNode producer) {
// do nothing here
}

private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) {

@SuppressWarnings("unchecked")
Expand All @@ -140,6 +109,32 @@ public interface BatchTransformTranslator<Type extends PTransform> {
void translateNode(Type transform, FlinkBatchTranslationContext context);
}

/**
* Returns a translator for the given node, if it is possible, otherwise null.
*/
private static BatchTransformTranslator<?> getTranslator(TransformTreeNode node) {
PTransform<?, ?> transform = node.getTransform();

// Root of the graph is null
if (transform == null) {
return null;
}

BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);

// No translator known
if (translator == null) {
return null;
}

// We actually only specialize CoGroupByKey when exactly 2 inputs
if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) {
return null;
}

return translator;
}

private static String genSpaces(int n) {
String s = "";
for (int i = 0; i < n; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* a {@link org.apache.flink.streaming.api.datastream.DataStream} (for streaming) or a
* {@link org.apache.flink.api.java.DataSet} (for batch) one.
*/
public abstract class FlinkPipelineTranslator implements Pipeline.PipelineVisitor {
public abstract class FlinkPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {

public void translate(Pipeline pipeline) {
pipeline.traverseTopologically(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {

private int depth = 0;

/** Composite transform that we want to translate before proceeding with other transforms. */
private PTransform<?, ?> currentCompositeTransform;

public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, PipelineOptions options) {
this.streamingContext = new FlinkStreamingTranslationContext(env, options);
}
Expand All @@ -55,47 +52,31 @@ public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, Pipeline
// --------------------------------------------------------------------------------------------

@Override
public void enterCompositeTransform(TransformTreeNode node) {
public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));

PTransform<?, ?> transform = node.getTransform();
if (transform != null && currentCompositeTransform == null) {

if (transform != null) {
StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
if (translator != null) {
currentCompositeTransform = transform;
applyStreamingTransform(transform, node, translator);
LOG.info(genSpaces(this.depth) + "translated-" + formatNodeName(node));
return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
}
}
this.depth++;
return CompositeBehavior.ENTER_TRANSFORM;
}

@Override
public void leaveCompositeTransform(TransformTreeNode node) {
PTransform<?, ?> transform = node.getTransform();
if (transform != null && currentCompositeTransform == transform) {

StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
if (translator != null) {
LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
applyStreamingTransform(transform, node, translator);
currentCompositeTransform = null;
} else {
throw new IllegalStateException("Attempted to translate composite transform " +
"but no translator was found: " + currentCompositeTransform);
}
}
this.depth--;
LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
}

@Override
public void visitTransform(TransformTreeNode node) {
LOG.info(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
if (currentCompositeTransform != null) {
// ignore it
return;
}

public void visitPrimitiveTransform(TransformTreeNode node) {
LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + formatNodeName(node));
// get the transformation corresponding to hte node we are
// currently visiting and translate it into its Flink alternative.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,17 +680,18 @@ public void visitValue(PValue value, TransformTreeNode producer) {
}

@Override
public void visitTransform(TransformTreeNode node) {
public void visitPrimitiveTransform(TransformTreeNode node) {
if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
}
}

@Override
public void enterCompositeTransform(TransformTreeNode node) {
public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
}
return CompositeBehavior.ENTER_TRANSFORM;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ public void addCollectionToSingletonOutput(String name,
/**
* Translates a Pipeline into the Dataflow representation.
*/
class Translator implements PipelineVisitor, TranslationContext {
class Translator extends PipelineVisitor.Defaults implements TranslationContext {
/** The Pipeline to translate. */
private final Pipeline pipeline;

Expand Down Expand Up @@ -493,16 +493,13 @@ public String getFullName(PTransform<?, ?> transform) {
return currentTransform;
}

@Override
public void enterCompositeTransform(TransformTreeNode node) {
}

@Override
public void leaveCompositeTransform(TransformTreeNode node) {
}

@Override
public void visitTransform(TransformTreeNode node) {
public void visitPrimitiveTransform(TransformTreeNode node) {
PTransform<?, ?> transform = node.getTransform();
TransformTranslator translator =
getTransformTranslator(transform.getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
Expand Down Expand Up @@ -820,26 +819,15 @@ public void translate(
}

/** Records all the composite transforms visited within the Pipeline. */
private static class CompositeTransformRecorder implements PipelineVisitor {
private static class CompositeTransformRecorder extends PipelineVisitor.Defaults {
private List<PTransform<?, ?>> transforms = new ArrayList<>();

@Override
public void enterCompositeTransform(TransformTreeNode node) {
public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
if (node.getTransform() != null) {
transforms.add(node.getTransform());
}
}

@Override
public void leaveCompositeTransform(TransformTreeNode node) {
}

@Override
public void visitTransform(TransformTreeNode node) {
}

@Override
public void visitValue(PValue value, TransformTreeNode producer) {
return CompositeBehavior.ENTER_TRANSFORM;
}

public List<PTransform<?, ?>> getCompositeTransforms() {
Expand Down
Loading