Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
import org.apache.beam.runners.dataflow.internal.PubsubIOTranslator;
import org.apache.beam.runners.dataflow.internal.ReadTranslator;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
Expand Down Expand Up @@ -63,6 +62,8 @@
import org.apache.beam.sdk.io.BigQueryIO;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.io.PubsubUnboundedSink;
import org.apache.beam.sdk.io.PubsubUnboundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.ShardNameTemplate;
import org.apache.beam.sdk.io.TextIO;
Expand Down Expand Up @@ -107,6 +108,7 @@
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionList;
Expand Down Expand Up @@ -177,6 +179,7 @@
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import javax.annotation.Nullable;

/**
* A {@link PipelineRunner} that executes the operations in the
Expand Down Expand Up @@ -338,33 +341,46 @@ public static DataflowPipelineRunner fromOptions(PipelineOptions options) {
this.pcollectionsRequiringIndexedFormat = new HashSet<>();
this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();

ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.<Class<?>, Class<?>>builder();
if (options.isStreaming()) {
overrides = ImmutableMap.<Class<?>, Class<?>>builder()
.put(Combine.GloballyAsSingletonView.class, StreamingCombineGloballyAsSingletonView.class)
.put(Create.Values.class, StreamingCreate.class)
.put(View.AsMap.class, StreamingViewAsMap.class)
.put(View.AsMultimap.class, StreamingViewAsMultimap.class)
.put(View.AsSingleton.class, StreamingViewAsSingleton.class)
.put(View.AsList.class, StreamingViewAsList.class)
.put(View.AsIterable.class, StreamingViewAsIterable.class)
.put(Write.Bound.class, StreamingWrite.class)
.put(PubsubIO.Write.Bound.class, StreamingPubsubIOWrite.class)
.put(Read.Unbounded.class, StreamingUnboundedRead.class)
.put(Read.Bounded.class, UnsupportedIO.class)
.put(AvroIO.Read.Bound.class, UnsupportedIO.class)
.put(AvroIO.Write.Bound.class, UnsupportedIO.class)
.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class)
.put(TextIO.Read.Bound.class, UnsupportedIO.class)
.put(TextIO.Write.Bound.class, UnsupportedIO.class)
.put(Window.Bound.class, AssignWindows.class)
.build();
builder.put(Combine.GloballyAsSingletonView.class,
StreamingCombineGloballyAsSingletonView.class);
builder.put(Create.Values.class, StreamingCreate.class);
builder.put(View.AsMap.class, StreamingViewAsMap.class);
builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class);
builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class);
builder.put(View.AsList.class, StreamingViewAsList.class);
builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
builder.put(Write.Bound.class, StreamingWrite.class);
builder.put(Read.Unbounded.class, StreamingUnboundedRead.class);
builder.put(Read.Bounded.class, UnsupportedIO.class);
builder.put(AvroIO.Read.Bound.class, UnsupportedIO.class);
builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class);
builder.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class);
builder.put(TextIO.Read.Bound.class, UnsupportedIO.class);
builder.put(TextIO.Write.Bound.class, UnsupportedIO.class);
builder.put(Window.Bound.class, AssignWindows.class);
// In streaming mode must use either the custom Pubsub unbounded source/sink or
// defer to Windmill's built-in implementation.
builder.put(PubsubIO.Read.Bound.PubsubBoundedReader.class, UnsupportedIO.class);
builder.put(PubsubIO.Write.Bound.PubsubBoundedWriter.class, UnsupportedIO.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are both DoFns, not PTransforms, so I do not think this will have any effect.

@kennknowles any suggestions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added support for this in UnsupportedIO below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

if (options.getExperiments() == null
|| !options.getExperiments().contains("enable_custom_pubsub_source")) {
builder.put(PubsubUnboundedSource.class, StreamingPubsubIORead.class);
}
if (options.getExperiments() == null
|| !options.getExperiments().contains("enable_custom_pubsub_sink")) {
builder.put(PubsubUnboundedSink.class, StreamingPubsubIOWrite.class);
}
} else {
ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.<Class<?>, Class<?>>builder();
builder.put(Read.Unbounded.class, UnsupportedIO.class);
builder.put(Window.Bound.class, AssignWindows.class);
builder.put(Write.Bound.class, BatchWrite.class);
builder.put(AvroIO.Write.Bound.class, BatchAvroIOWrite.class);
builder.put(TextIO.Write.Bound.class, BatchTextIOWrite.class);
// In batch mode must use the custom Pubsub bounded source/sink.
builder.put(PubsubUnboundedSource.class, UnsupportedIO.class);
builder.put(PubsubUnboundedSink.class, UnsupportedIO.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here. In this case, I believe it will be caught by the check above for Read.Unbounded

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are both plain-old-PTransforms. I also added overloads in UnsupportedIO below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack thanks

if (options.getExperiments() == null
|| !options.getExperiments().contains("disable_ism_side_input")) {
builder.put(View.AsMap.class, BatchViewAsMap.class);
Expand All @@ -373,8 +389,8 @@ public static DataflowPipelineRunner fromOptions(PipelineOptions options) {
builder.put(View.AsList.class, BatchViewAsList.class);
builder.put(View.AsIterable.class, BatchViewAsIterable.class);
}
overrides = builder.build();
}
overrides = builder.build();
}

/**
Expand Down Expand Up @@ -2336,27 +2352,104 @@ protected String getKindString() {
}
}

// ================================================================================
// PubsubIO translations
// ================================================================================

/**
* Specialized implementation for
* {@link org.apache.beam.sdk.io.PubsubIO.Write PubsubIO.Write} for the
* Dataflow runner in streaming mode.
*
* <p>For internal use only. Subject to change at any time.
*
* <p>Public so the {@link PubsubIOTranslator} can access.
* Suppress application of {@link PubsubUnboundedSource#apply} in streaming mode so that we
* can instead defer to Windmill's implementation.
*/
public static class StreamingPubsubIOWrite<T> extends PTransform<PCollection<T>, PDone> {
private final PubsubIO.Write.Bound<T> transform;
private static class StreamingPubsubIORead<T> extends PTransform<PBegin, PCollection<T>> {
private final PubsubUnboundedSource<T> transform;

/**
* Builds an instance of this class from the overridden transform.
*/
public StreamingPubsubIORead(
DataflowPipelineRunner runner, PubsubUnboundedSource<T> transform) {
this.transform = transform;
}

PubsubUnboundedSource<T> getOverriddenTransform() {
return transform;
}

@Override
public PCollection<T> apply(PBegin input) {
return PCollection.<T>createPrimitiveOutputInternal(
input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
.setCoder(transform.getElementCoder());
}

@Override
protected String getKindString() {
return "StreamingPubsubIORead";
}

static {
DataflowPipelineTranslator.registerTransformTranslator(
StreamingPubsubIORead.class, new StreamingPubsubIOReadTranslator());
}
}

/**
* Rewrite {@link StreamingPubsubIORead} to the appropriate internal node.
*/
private static class StreamingPubsubIOReadTranslator implements
TransformTranslator<StreamingPubsubIORead> {
@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public void translate(
StreamingPubsubIORead transform,
TranslationContext context) {
translateTyped(transform, context);
}

private <T> void translateTyped(
StreamingPubsubIORead<T> transform,
TranslationContext context) {
checkArgument(context.getPipelineOptions().isStreaming(),
"StreamingPubsubIORead is only for streaming pipelines.");
PubsubUnboundedSource<T> overriddenTransform = transform.getOverriddenTransform();
context.addStep(transform, "ParallelRead");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overriddenTransform or transform?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transform
Later the getInput will assert if the context step does not match the input.

context.addInput(PropertyNames.FORMAT, "pubsub");
if (overriddenTransform.getTopic() != null) {
context.addInput(PropertyNames.PUBSUB_TOPIC,
overriddenTransform.getTopic().getV1Beta1Path());
}
if (overriddenTransform.getSubscription() != null) {
context.addInput(
PropertyNames.PUBSUB_SUBSCRIPTION,
overriddenTransform.getSubscription().getV1Beta1Path());
}
if (overriddenTransform.getTimestampLabel() != null) {
context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL,
overriddenTransform.getTimestampLabel());
}
if (overriddenTransform.getIdLabel() != null) {
context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
}
context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
}
}

/**
* Suppress application of {@link PubsubUnboundedSink#apply} in streaming mode so that we
* can instead defer to Windmill's implementation.
*/
private static class StreamingPubsubIOWrite<T> extends PTransform<PCollection<T>, PDone> {
private final PubsubUnboundedSink<T> transform;

/**
* Builds an instance of this class from the overridden transform.
*/
public StreamingPubsubIOWrite(
DataflowPipelineRunner runner, PubsubIO.Write.Bound<T> transform) {
DataflowPipelineRunner runner, PubsubUnboundedSink<T> transform) {
this.transform = transform;
}

public PubsubIO.Write.Bound<T> getOverriddenTransform() {
PubsubUnboundedSink<T> getOverriddenTransform() {
return transform;
}

Expand All @@ -2369,8 +2462,51 @@ public PDone apply(PCollection<T> input) {
protected String getKindString() {
return "StreamingPubsubIOWrite";
}

static {
DataflowPipelineTranslator.registerTransformTranslator(
StreamingPubsubIOWrite.class, new StreamingPubsubIOWriteTranslator());
}
}

/**
* Rewrite {@link StreamingPubsubIOWrite} to the appropriate internal node.
*/
private static class StreamingPubsubIOWriteTranslator implements
TransformTranslator<StreamingPubsubIOWrite> {

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public void translate(
StreamingPubsubIOWrite transform,
TranslationContext context) {
translateTyped(transform, context);
}

private <T> void translateTyped(
StreamingPubsubIOWrite<T> transform,
TranslationContext context) {
checkArgument(context.getPipelineOptions().isStreaming(),
"StreamingPubsubIOWrite is only for streaming pipelines.");
PubsubUnboundedSink<T> overriddenTransform = transform.getOverriddenTransform();
context.addStep(transform, "ParallelWrite");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transform vs overridenTransform?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

context.addInput(PropertyNames.FORMAT, "pubsub");
context.addInput(PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getV1Beta1Path());
if (overriddenTransform.getTimestampLabel() != null) {
context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL,
overriddenTransform.getTimestampLabel());
}
if (overriddenTransform.getIdLabel() != null) {
context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
}
context.addEncodingInput(
WindowedValue.getValueOnlyCoder(overriddenTransform.getElementCoder()));
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
}
}

// ================================================================================

/**
* Specialized implementation for
* {@link org.apache.beam.sdk.io.Read.Unbounded Read.Unbounded} for the
Expand Down Expand Up @@ -2912,11 +3048,14 @@ public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inp
}

/**
* Specialized expansion for unsupported IO transforms that throws an error.
* Specialized expansion for unsupported IO transforms and DoFns that throws an error.
*/
private static class UnsupportedIO<InputT extends PInput, OutputT extends POutput>
extends PTransform<InputT, OutputT> {
@Nullable
private PTransform<?, ?> transform;
@Nullable
private DoFn<?, ?> doFn;

/**
* Builds an instance of this class from the overridden transform.
Expand Down Expand Up @@ -2974,13 +3113,50 @@ public UnsupportedIO(DataflowPipelineRunner runner, TextIO.Write.Bound<?> transf
this.transform = transform;
}

/**
* Builds an instance of this class from the overridden doFn.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public UnsupportedIO(DataflowPipelineRunner runner,
PubsubIO.Read.Bound<?>.PubsubBoundedReader doFn) {
this.doFn = doFn;
}

/**
* Builds an instance of this class from the overridden doFn.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public UnsupportedIO(DataflowPipelineRunner runner,
PubsubIO.Write.Bound<?>.PubsubBoundedWriter doFn) {
this.doFn = doFn;
}

/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public UnsupportedIO(DataflowPipelineRunner runner, PubsubUnboundedSource<?> transform) {
this.transform = transform;
}

/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public UnsupportedIO(DataflowPipelineRunner runner, PubsubUnboundedSink<?> transform) {
this.transform = transform;
}


@Override
public OutputT apply(InputT input) {
String mode = input.getPipeline().getOptions().as(StreamingOptions.class).isStreaming()
? "streaming" : "batch";
String name =
transform == null ? approximateSimpleName(doFn.getClass()) :
approximatePTransformName(transform.getClass());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix wrapping here? : should be at the start of the line

throw new UnsupportedOperationException(
String.format("The DataflowPipelineRunner in %s mode does not support %s.",
mode, approximatePTransformName(transform.getClass())));
String.format("The DataflowPipelineRunner in %s mode does not support %s.", mode, name));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import static com.google.common.base.Preconditions.checkNotNull;

import org.apache.beam.runners.dataflow.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly;
import org.apache.beam.runners.dataflow.internal.PubsubIOTranslator;
import org.apache.beam.runners.dataflow.internal.ReadTranslator;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.DoFnInfo;
Expand All @@ -41,7 +40,6 @@
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.TransformTreeNode;
Expand Down Expand Up @@ -1009,12 +1007,6 @@ private <T> void translateHelper(
///////////////////////////////////////////////////////////////////////////
// IO Translation.

registerTransformTranslator(
PubsubIO.Read.Bound.class, new PubsubIOTranslator.ReadTranslator());
registerTransformTranslator(
DataflowPipelineRunner.StreamingPubsubIOWrite.class,
new PubsubIOTranslator.WriteTranslator());

registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
}

Expand Down
Loading