Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.beam.runners.flink.translation;

import com.google.cloud.dataflow.sdk.io.BoundedSource;
import org.apache.beam.runners.flink.translation.functions.UnionCoder;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
import org.apache.beam.runners.flink.translation.wrappers.streaming.*;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.FlinkStreamingCreateFunction;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
Expand Down Expand Up @@ -72,6 +74,7 @@ public class FlinkStreamingTransformTranslators {
// here you can find all the available translators.
static {
TRANSLATORS.put(Create.Values.class, new CreateStreamingTranslator());
TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
Expand Down Expand Up @@ -103,7 +106,7 @@ public void translateNode(Create.Values<OUT> transform, FlinkStreamingTranslatio
// in the FlatMap function using the Coder.

List<byte[]> serializedElements = Lists.newArrayList();
Coder<OUT> elementCoder = context.getOutput(transform).getCoder();
Coder<OUT> elementCoder = output.getCoder();
for (OUT element: elements) {
ByteArrayOutputStream bao = new ByteArrayOutputStream();
try {
Expand All @@ -126,7 +129,7 @@ public void translateNode(Create.Values<OUT> transform, FlinkStreamingTranslatio
DataStream<WindowedValue<OUT>> outputDataStream = initDataSet.flatMap(createFunction)
.returns(outputType);

context.setOutputDataStream(context.getOutput(transform), outputDataStream);
context.setOutputDataStream(output, outputDataStream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could also swap the call to getOutput at L109. [GitHub won't let me comment there.]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done.

}
}

Expand Down Expand Up @@ -164,6 +167,40 @@ public void flatMap(WindowedValue<T> value, Collector<String> out) throws Except
}
}

private static class BoundedReadSourceTranslator<T>
implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {

@Override
public void translateNode(Read.Bounded<T> transform, FlinkStreamingTranslationContext context) {

BoundedSource<T> boundedSource = transform.getSource();
PCollection<T> output = context.getOutput(transform);

Coder<T> defaultOutputCoder = boundedSource.getDefaultOutputCoder();
CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(defaultOutputCoder);

DataStream<T> source = context.getExecutionEnvironment().createInput(
new SourceInputFormat<>(
boundedSource,
context.getPipelineOptions()),
typeInfo);

DataStream<WindowedValue<T>> windowedStream = source.flatMap(
new FlatMapFunction<T, WindowedValue<T>>() {
@Override
public void flatMap(T value, Collector<WindowedValue<T>> out) throws Exception {
out.collect(
WindowedValue.of(value,
BoundedWindow.TIMESTAMP_MIN_VALUE,
GlobalWindow.INSTANCE,
PaneInfo.NO_FIRING));
}
}).assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know nothing here, just want to confirm that it's okay to use an "IngestionTimeExtractor" for a collection where all elements have timestamps of MIN_VALUE.


context.setOutputDataStream(output, windowedStream);
}
}

private static class UnboundedReadSourceTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {

@Override
Expand All @@ -172,19 +209,26 @@ public void translateNode(Read.Unbounded<T> transform, FlinkStreamingTranslation

DataStream<WindowedValue<T>> source;
if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) {
UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) transform.getSource();
@SuppressWarnings("unchecked")
UnboundedFlinkSource<T> flinkSource = (UnboundedFlinkSource<T>) transform.getSource();
source = context.getExecutionEnvironment()
.addSource(flinkSource.getFlinkSource())
.flatMap(new FlatMapFunction<String, WindowedValue<String>>() {
.flatMap(new FlatMapFunction<T, WindowedValue<T>>() {
@Override
public void flatMap(String s, Collector<WindowedValue<String>> collector) throws Exception {
collector.collect(WindowedValue.<String>of(s, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
public void flatMap(T s, Collector<WindowedValue<T>> collector) throws Exception {
collector.collect(
WindowedValue.of(
s,
Instant.now(),
GlobalWindow.INSTANCE,
PaneInfo.NO_FIRING));
}
}).assignTimestampsAndWatermarks(new IngestionTimeExtractor());
}).assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same questions as above w.r.t. timestamps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SourceFunctions don't have to assign timestamps or emit watermarks when we are using Event Time. In Flink, you can always assign them later. In the Beam Runner we default to Event Time which makes it tricky for us because the user can't actually extract watermarks/timestamps via the Flink API. Thus, we defaulted to Ingestion Time here to give users a good out of the box experience, e.g. when they read data from Kafka. We are currently reworking some of the provided sources to emit timestamps/watermarks directly in the source.

Note that, this limitation applies only to Flink wrapped sources. The general wrapper for unbounded sources emits watermarks and timestamped values.

} else {
source = context.getExecutionEnvironment()
.addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform));
}

context.setOutputDataStream(output, source);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,23 @@
package org.apache.beam.runners.flink.translation.wrappers;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.Source;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;


/**
* A Flink {@link org.apache.flink.api.common.io.InputFormat} that wraps a
* Dataflow {@link com.google.cloud.dataflow.sdk.io.Source}.
Expand All @@ -44,37 +43,40 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>>
private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class);

private final BoundedSource<T> initialSource;

private transient PipelineOptions options;
private final byte[] serializedOptions;

private BoundedSource.BoundedReader<T> reader = null;
private boolean reachedEnd = true;
private transient BoundedSource.BoundedReader<T> reader = null;
private boolean inputAvailable = true;

public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) {
this.initialSource = initialSource;
this.options = options;
}

private void writeObject(ObjectOutputStream out)
throws IOException, ClassNotFoundException {
out.defaultWriteObject();
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(out, options);
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
new ObjectMapper().writeValue(baos, options);
serializedOptions = baos.toByteArray();
} catch (Exception e) {
throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
}

private void readObject(ObjectInputStream in)
throws IOException, ClassNotFoundException {
in.defaultReadObject();
ObjectMapper mapper = new ObjectMapper();
options = mapper.readValue(in, PipelineOptions.class);
}

@Override
public void configure(Configuration configuration) {}
public void configure(Configuration configuration) {
try {
options = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
} catch (IOException e) {
throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
}
}

@Override
public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);
reachedEnd = false;
inputAvailable = reader.start();
}

@Override
Expand All @@ -86,7 +88,6 @@ public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOExce
@Override
public long getTotalInputSize() {
return estimatedSize;

}

@Override
Expand All @@ -109,51 +110,40 @@ public float getAverageRecordWidth() {
@Override
@SuppressWarnings("unchecked")
public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
long desiredSizeBytes;
try {
desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes,
options);
List<SourceInputSplit<T>> splits = new ArrayList<>();
int splitCount = 0;
for (Source<T> shard: shards) {
splits.add(new SourceInputSplit<>(shard, splitCount++));
long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes, options);
int numShards = shards.size();
SourceInputSplit<T>[] sourceInputSplits = new SourceInputSplit[numShards];
for (int i = 0; i < numShards; i++) {
sourceInputSplits[i] = new SourceInputSplit<>(shards.get(i), i);
}
return splits.toArray(new SourceInputSplit[splits.size()]);
return sourceInputSplits;
} catch (Exception e) {
throw new IOException("Could not create input splits from Source.", e);
}
}

@Override
public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) {
return new InputSplitAssigner() {
private int index = 0;
private final SourceInputSplit[] splits = sourceInputSplits;
@Override
public InputSplit getNextInputSplit(String host, int taskId) {
if (index < splits.length) {
return splits[index++];
} else {
return null;
}
}
};
return new DefaultInputSplitAssigner(sourceInputSplits);
}


@Override
public boolean reachedEnd() throws IOException {
return reachedEnd;
return !inputAvailable;
}

@Override
public T nextRecord(T t) throws IOException {

reachedEnd = !reader.advance();
if (!reachedEnd) {
return reader.getCurrent();
if (inputAvailable) {
final T current = reader.getCurrent();
// advance reader to have a record ready next time
inputAvailable = reader.advance();
return current;
}

return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you using null to mean "no element"? What about collections or sources that might produce null elements as a valid value? (This is why we use the NoSuchElement exception)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this doesn't look good but it is the behavior of the InputFormat in Flink. A null signals that the current input split has been read. Flink doesn't programmatically support null values which was a little tricky when implementing the runner.

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ public void trigger(long timestamp) throws Exception {
private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
if (this.isRunning) {
long watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval();
long timeToNextWatermark = getTimeToNextWaternark(watermarkInterval);
long timeToNextWatermark = getTimeToNextWatermark(watermarkInterval);
runtime.registerTimer(timeToNextWatermark, this);
}
}

private long getTimeToNextWaternark(long watermarkInterval) {
private long getTimeToNextWatermark(long watermarkInterval) {
return System.currentTimeMillis() + watermarkInterval;
}

Expand Down
Loading