From 49271b79b1c97c760a2852917a1d421f7f7d4e58 Mon Sep 17 00:00:00 2001
From: Maximilian Michels
Date: Wed, 23 Mar 2016 16:18:04 +0100
Subject: [PATCH 1/3] [BEAM-144] solve reader serialization issue
Now, we initialize the UnboundedSourceReader at runtime which requires
us to keep a copy of the PipelineOptions. This should be fine here
because we are at the lowest point of the execution stack.
---
.../streaming/io/UnboundedSourceWrapper.java | 87 +++++++++++++------
1 file changed, 62 insertions(+), 25 deletions(-)
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 97084cfc7ab8..0f1aae670dff 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
@@ -31,26 +32,36 @@
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.joda.time.Instant;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
/**
- * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded}
- * interface.
+ * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the
+ * {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded} interface.
*
- *
- * For now we support non-parallel, not checkpointed sources.
+ * For now we only support non-parallel sources.
* */
public class UnboundedSourceWrapper extends RichSourceFunction> implements Triggerable {
private final String name;
- private final UnboundedSource.UnboundedReader reader;
+ private final UnboundedSource source;
private StreamingRuntimeContext runtime = null;
private StreamSource.ManualWatermarkContext> context = null;
private volatile boolean isRunning = false;
- public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded transform) {
+ /** Serialized using custom Java serialization via Jackson */
+ private transient PipelineOptions pipelineOptions;
+
+ /** Instantiated during runtime **/
+ private transient UnboundedSource.UnboundedReader reader;
+
+ public UnboundedSourceWrapper(PipelineOptions pipelineOptions, Read.Unbounded transform) {
this.name = transform.getName();
- this.reader = transform.getSource().createReader(options, null);
+ this.pipelineOptions = pipelineOptions;
+ this.source = transform.getSource();
}
public String getName() {
@@ -67,40 +78,51 @@ WindowedValue makeWindowedValue(T output, Instant timestamp) {
@Override
public void run(SourceContext> ctx) throws Exception {
if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
- throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " +
- "Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
+ throw new RuntimeException(
+ "We assume that all sources in Dataflow are EventTimeSourceFunction. " +
+ "Apparently " + this.name + " is not. " +
+ "Probably you should consider writing your own Wrapper for this source.");
}
context = (StreamSource.ManualWatermarkContext>) ctx;
runtime = (StreamingRuntimeContext) getRuntimeContext();
- this.isRunning = true;
+ isRunning = true;
+
+ reader = source.createReader(pipelineOptions, null);
+
boolean inputAvailable = reader.start();
setNextWatermarkTimer(this.runtime);
- while (isRunning) {
-
- while (!inputAvailable && isRunning) {
- // wait a bit until we retry to pull more records
- Thread.sleep(50);
- inputAvailable = reader.advance();
- }
- if (inputAvailable) {
+ try {
- // get it and its timestamp from the source
- T item = reader.getCurrent();
- Instant timestamp = reader.getCurrentTimestamp();
+ while (isRunning) {
- // write it to the output collector
- synchronized (ctx.getCheckpointLock()) {
- context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
+ if (!inputAvailable && isRunning) {
+ // wait a bit until we retry to pull more records
+ Thread.sleep(50);
+ inputAvailable = reader.advance();
}
- inputAvailable = reader.advance();
+ if (inputAvailable) {
+
+ // get it and its timestamp from the source
+ T item = reader.getCurrent();
+ Instant timestamp = reader.getCurrentTimestamp();
+
+ // write it to the output collector
+ synchronized (ctx.getCheckpointLock()) {
+ context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
+ }
+
+ inputAvailable = reader.advance();
+ }
}
+ } finally {
+ reader.close();
}
}
@@ -131,4 +153,19 @@ private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
private long getTimeToNextWaternark(long watermarkInterval) {
return System.currentTimeMillis() + watermarkInterval;
}
+
+
+ // Special serialization of the PipelineOptions necessary to instantiate the reader.
+ private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException {
+ out.defaultWriteObject();
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.writeValue(out, pipelineOptions);
+ }
+
+ // Special deserialization of the PipelineOptions necessary to instantiate the reader.
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ ObjectMapper mapper = new ObjectMapper();
+ pipelineOptions = mapper.readValue(in, PipelineOptions.class);
+ }
}
From 3fcf511495cae783c09d4e7a8eb985128c755eff Mon Sep 17 00:00:00 2001
From: Maximilian Michels
Date: Wed, 23 Mar 2016 16:19:27 +0100
Subject: [PATCH 2/3] [BEAM-143] [flink] add test for UnboundedSourceWrapper
The test ensures serialization and execution of the wrapper works as
expected.
---
.../streaming/UnboundedSourceITCase.java | 203 ++++++++++++++++++
1 file changed, 203 insertions(+)
create mode 100644 runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
new file mode 100644
index 000000000000..2d9f10908a77
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import org.apache.beam.runners.flink.FlinkTestPipeline;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.joda.time.Instant;
+import org.junit.internal.ArrayComparisonFailure;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+
+public class UnboundedSourceITCase extends StreamingProgramTestBase {
+
+ protected static String resultPath;
+
+ public UnboundedSourceITCase() {
+ }
+
+ static final String[] EXPECTED_RESULT = new String[]{
+ "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ runProgram(resultPath);
+ }
+
+ private static void runProgram(String resultPath) {
+
+ Pipeline p = FlinkTestPipeline.createForStreaming();
+
+ PCollection result = p
+ .apply(Read.from(new RangeReadSource(1, 10)))
+ .apply(Window.into(new GlobalWindows())
+ .triggering(AfterPane.elementCountAtLeast(10))
+ .discardingFiredPanes())
+ .apply(ParDo.of(new DoFn() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(c.element().toString());
+ }
+ }));
+
+ result.apply(TextIO.Write.to(resultPath));
+
+ try {
+ p.run();
+ } catch(Exception e) {
+ assertEquals("The source terminates as expected.", e.getCause().getCause().getMessage());
+ }
+ }
+
+
+ private static class RangeReadSource extends UnboundedSource {
+
+ final int from;
+ final int to;
+
+ RangeReadSource(int from, int to) {
+ this.from = from;
+ this.to = to;
+ }
+
+
+ @Override
+ public List extends UnboundedSource> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ return null;
+ }
+
+ @Override
+ public UnboundedReader createReader(PipelineOptions options, @Nullable CheckpointMark checkpointMark) {
+ return new RangeReadReader(options);
+ }
+
+ @Nullable
+ @Override
+ public Coder getCheckpointMarkCoder() {
+ return null;
+ }
+
+ @Override
+ public void validate() {
+ }
+
+ @Override
+ public Coder getDefaultOutputCoder() {
+ return BigEndianIntegerCoder.of();
+ }
+
+ private class RangeReadReader extends UnboundedReader {
+
+ private int current;
+
+ public RangeReadReader(PipelineOptions options) {
+ assertNotNull(options);
+ current = from;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return true;
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ current++;
+ if (current >= to) {
+ try {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ throw new IOException("The source terminates as expected.");
+ } catch (IOException e) {
+ // pass on the exception to terminate the source
+ throw e;
+ } catch (Throwable t) {
+ // expected here from the file check
+ }
+ }
+ return current < to;
+ }
+
+ @Override
+ public Integer getCurrent() throws NoSuchElementException {
+ return current;
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return Instant.now();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return Instant.now();
+ }
+
+ @Override
+ public CheckpointMark getCheckpointMark() {
+ return null;
+ }
+
+ @Override
+ public UnboundedSource getCurrentSource() {
+ return RangeReadSource.this;
+ }
+ }
+ }
+}
+
+
From 8e362fbdf1b42f18572974743ff775c3946f1ae0 Mon Sep 17 00:00:00 2001
From: Maximilian Michels
Date: Wed, 23 Mar 2016 18:46:11 +0100
Subject: [PATCH 3/3] address PR comments
---
.../streaming/io/UnboundedSourceWrapper.java | 2 +-
.../flink/streaming/UnboundedSourceITCase.java | 13 ++++++++++---
2 files changed, 11 insertions(+), 4 deletions(-)
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 0f1aae670dff..52619789282b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -40,7 +40,7 @@
* A wrapper for Beam's unbounded sources. This class wraps around a source implementing the
* {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded} interface.
*
- * For now we only support non-parallel sources.
+ * For now we only support non-parallel sources, checkpointing is WIP.
* */
public class UnboundedSourceWrapper extends RichSourceFunction> implements Triggerable {
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
index 2d9f10908a77..f36028e17de8 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
@@ -35,6 +35,7 @@
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
import org.apache.beam.runners.flink.FlinkTestPipeline;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.joda.time.Instant;
@@ -47,6 +48,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
public class UnboundedSourceITCase extends StreamingProgramTestBase {
@@ -94,6 +96,7 @@ public void processElement(ProcessContext c) throws Exception {
try {
p.run();
+ fail();
} catch(Exception e) {
assertEquals("The source terminates as expected.", e.getCause().getCause().getMessage());
}
@@ -114,7 +117,7 @@ private static class RangeReadSource extends UnboundedSource> generateInitialSplits(
int desiredNumSplits, PipelineOptions options) throws Exception {
- return null;
+ return ImmutableList.of(this);
}
@Override
@@ -141,6 +144,8 @@ private class RangeReadReader extends UnboundedReader {
private int current;
+ private long watermark;
+
public RangeReadReader(PipelineOptions options) {
assertNotNull(options);
current = from;
@@ -154,6 +159,8 @@ public boolean start() throws IOException {
@Override
public boolean advance() throws IOException {
current++;
+ watermark++;
+
if (current >= to) {
try {
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
@@ -175,7 +182,7 @@ public Integer getCurrent() throws NoSuchElementException {
@Override
public Instant getCurrentTimestamp() throws NoSuchElementException {
- return Instant.now();
+ return new Instant(current);
}
@Override
@@ -184,7 +191,7 @@ public void close() throws IOException {
@Override
public Instant getWatermark() {
- return Instant.now();
+ return new Instant(watermark);
}
@Override