diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 97084cfc7ab8..52619789282b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
@@ -31,26 +32,36 @@
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.joda.time.Instant;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
/**
- * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded}
- * interface.
+ * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the
+ * {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded} interface.
*
- *
- * For now we support non-parallel, not checkpointed sources.
+ * For now we only support non-parallel sources, checkpointing is WIP.
* */
public class UnboundedSourceWrapper extends RichSourceFunction> implements Triggerable {
private final String name;
- private final UnboundedSource.UnboundedReader reader;
+ private final UnboundedSource source;
private StreamingRuntimeContext runtime = null;
private StreamSource.ManualWatermarkContext> context = null;
private volatile boolean isRunning = false;
- public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded transform) {
+ /** Serialized using custom Java serialization via Jackson */
+ private transient PipelineOptions pipelineOptions;
+
+ /** Instantiated during runtime **/
+ private transient UnboundedSource.UnboundedReader reader;
+
+ public UnboundedSourceWrapper(PipelineOptions pipelineOptions, Read.Unbounded transform) {
this.name = transform.getName();
- this.reader = transform.getSource().createReader(options, null);
+ this.pipelineOptions = pipelineOptions;
+ this.source = transform.getSource();
}
public String getName() {
@@ -67,40 +78,51 @@ WindowedValue makeWindowedValue(T output, Instant timestamp) {
@Override
public void run(SourceContext> ctx) throws Exception {
if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
- throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " +
- "Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
+ throw new RuntimeException(
+ "We assume that all sources in Dataflow are EventTimeSourceFunction. " +
+ "Apparently " + this.name + " is not. " +
+ "Probably you should consider writing your own Wrapper for this source.");
}
context = (StreamSource.ManualWatermarkContext>) ctx;
runtime = (StreamingRuntimeContext) getRuntimeContext();
- this.isRunning = true;
+ isRunning = true;
+
+ reader = source.createReader(pipelineOptions, null);
+
boolean inputAvailable = reader.start();
setNextWatermarkTimer(this.runtime);
- while (isRunning) {
-
- while (!inputAvailable && isRunning) {
- // wait a bit until we retry to pull more records
- Thread.sleep(50);
- inputAvailable = reader.advance();
- }
- if (inputAvailable) {
+ try {
- // get it and its timestamp from the source
- T item = reader.getCurrent();
- Instant timestamp = reader.getCurrentTimestamp();
+ while (isRunning) {
- // write it to the output collector
- synchronized (ctx.getCheckpointLock()) {
- context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
+ if (!inputAvailable && isRunning) {
+ // wait a bit until we retry to pull more records
+ Thread.sleep(50);
+ inputAvailable = reader.advance();
}
- inputAvailable = reader.advance();
+ if (inputAvailable) {
+
+ // get it and its timestamp from the source
+ T item = reader.getCurrent();
+ Instant timestamp = reader.getCurrentTimestamp();
+
+ // write it to the output collector
+ synchronized (ctx.getCheckpointLock()) {
+ context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
+ }
+
+ inputAvailable = reader.advance();
+ }
}
+ } finally {
+ reader.close();
}
}
@@ -131,4 +153,19 @@ private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
private long getTimeToNextWaternark(long watermarkInterval) {
return System.currentTimeMillis() + watermarkInterval;
}
+
+
+ // Special serialization of the PipelineOptions necessary to instantiate the reader.
+ private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException {
+ out.defaultWriteObject();
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.writeValue(out, pipelineOptions);
+ }
+
+ // Special deserialization of the PipelineOptions necessary to instantiate the reader.
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ ObjectMapper mapper = new ObjectMapper();
+ pipelineOptions = mapper.readValue(in, PipelineOptions.class);
+ }
}
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
new file mode 100644
index 000000000000..f36028e17de8
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.runners.flink.FlinkTestPipeline;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.joda.time.Instant;
+import org.junit.internal.ArrayComparisonFailure;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+
+public class UnboundedSourceITCase extends StreamingProgramTestBase {
+
+ protected static String resultPath;
+
+ public UnboundedSourceITCase() {
+ }
+
+ static final String[] EXPECTED_RESULT = new String[]{
+ "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ runProgram(resultPath);
+ }
+
+ private static void runProgram(String resultPath) {
+
+ Pipeline p = FlinkTestPipeline.createForStreaming();
+
+ PCollection result = p
+ .apply(Read.from(new RangeReadSource(1, 10)))
+ .apply(Window.into(new GlobalWindows())
+ .triggering(AfterPane.elementCountAtLeast(10))
+ .discardingFiredPanes())
+ .apply(ParDo.of(new DoFn() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(c.element().toString());
+ }
+ }));
+
+ result.apply(TextIO.Write.to(resultPath));
+
+ try {
+ p.run();
+ fail();
+ } catch(Exception e) {
+ assertEquals("The source terminates as expected.", e.getCause().getCause().getMessage());
+ }
+ }
+
+
+ private static class RangeReadSource extends UnboundedSource {
+
+ final int from;
+ final int to;
+
+ RangeReadSource(int from, int to) {
+ this.from = from;
+ this.to = to;
+ }
+
+
+ @Override
+ public List extends UnboundedSource> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ return ImmutableList.of(this);
+ }
+
+ @Override
+ public UnboundedReader createReader(PipelineOptions options, @Nullable CheckpointMark checkpointMark) {
+ return new RangeReadReader(options);
+ }
+
+ @Nullable
+ @Override
+ public Coder getCheckpointMarkCoder() {
+ return null;
+ }
+
+ @Override
+ public void validate() {
+ }
+
+ @Override
+ public Coder getDefaultOutputCoder() {
+ return BigEndianIntegerCoder.of();
+ }
+
+ private class RangeReadReader extends UnboundedReader {
+
+ private int current;
+
+ private long watermark;
+
+ public RangeReadReader(PipelineOptions options) {
+ assertNotNull(options);
+ current = from;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return true;
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ current++;
+ watermark++;
+
+ if (current >= to) {
+ try {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ throw new IOException("The source terminates as expected.");
+ } catch (IOException e) {
+ // pass on the exception to terminate the source
+ throw e;
+ } catch (Throwable t) {
+ // expected here from the file check
+ }
+ }
+ return current < to;
+ }
+
+ @Override
+ public Integer getCurrent() throws NoSuchElementException {
+ return current;
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return new Instant(current);
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return new Instant(watermark);
+ }
+
+ @Override
+ public CheckpointMark getCheckpointMark() {
+ return null;
+ }
+
+ @Override
+ public UnboundedSource getCurrentSource() {
+ return RangeReadSource.this;
+ }
+ }
+ }
+}
+
+