Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions runners/flink/1.6/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ def basePath = '..'
/* All properties required for loading the Flink build script */
project.ext {
// Set the version of all Flink-related dependencies here.
flink_version = '1.6.2'
flink_version = '1.6.3'
// Look for the source code in the parent module
main_source_dirs = ["$basePath/src/main/java"]
main_source_dirs = ["$basePath/src/main/java", './src/main/java']
test_source_dirs = ["$basePath/src/test/java"]
main_resources_dirs = ["$basePath/src/main/resources"]
test_resources_dirs = ["$basePath/src/test/resources"]
Expand Down

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions runners/flink/1.7/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ def basePath = '..'
/* All properties required for loading the Flink build script */
project.ext {
// Set the version of all Flink-related dependencies here.
flink_version = '1.7.1'
flink_version = '1.7.2'
// Look for the source code in the parent module
main_source_dirs = ["$basePath/src/main/java"]
main_source_dirs = ["$basePath/src/main/java", "$basePath/1.6/src/main/java"]
test_source_dirs = ["$basePath/src/test/java"]
main_resources_dirs = ["$basePath/src/main/resources"]
test_resources_dirs = ["$basePath/src/test/resources"]
Expand Down
4 changes: 2 additions & 2 deletions runners/flink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ project.ext {
// Set the version of all Flink-related dependencies here.
flink_version = '1.5.6'
// Look for the source code in the current module
main_source_dirs = ['./src/main/java']
test_source_dirs = ['./src/test/java']
main_source_dirs = ['./src/main/java', './src/1.5/main/java']
test_source_dirs = ['./src/test/java', './src/1.5/test/java']
main_resources_dirs = ['./src/main/resources']
test_resources_dirs = ['./src/test/resources']
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.streaming;
package org.apache.beam.runners.flink.translation.wrapper.streaming;

import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsTest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
Expand Down Expand Up @@ -117,7 +118,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
private transient FlinkMetricContainer flinkMetricContainer;
private transient long backupWatermarkHold = Long.MIN_VALUE;

/** Constructor. */
/** This is a constructor, please see the class description above. */
public ExecutableStageDoFnOperator(
String stepName,
Coder<WindowedValue<InputT>> windowedInputCoder,
Expand Down Expand Up @@ -505,7 +506,7 @@ private static class SdkHarnessDoFnRunner<InputT, OutputT>
private final StageBundleFactory stageBundleFactory;
private final StateRequestHandler stateRequestHandler;
private final BundleProgressHandler progressHandler;
private final BufferedOutputManager<OutputT> outputManager;
private final DoFnRunners.OutputManager outputManager;
private final Map<String, TupleTag<?>> outputMap;
/** Timer Output Pcollection id => TimerSpec. */
private final Map<String, TimerSpec> timerOutputIdToSpecMap;
Expand All @@ -528,7 +529,7 @@ public SdkHarnessDoFnRunner(
StageBundleFactory stageBundleFactory,
StateRequestHandler stateRequestHandler,
BundleProgressHandler progressHandler,
BufferedOutputManager<OutputT> outputManager,
DoFnRunners.OutputManager outputManager,
Map<String, TupleTag<?>> outputMap,
Coder<BoundedWindow> windowCoder,
KeySelector<WindowedValue<InputT>, ?> keySelector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.databind.util.LRUMap;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
Expand Down Expand Up @@ -1163,17 +1164,29 @@ public void finishBundle(FinishBundleContext context) {
testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("c")));

// draw a snapshot
boolean bundleFinishedBeforeSnapshot = callPrepareSnapshotPreBarrier(doFnOperator);
OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);

// There is a finishBundle in snapshot()
// Elements will be buffered as part of finishing a bundle in snapshot()
assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow("a"),
WindowedValue.valueInGlobalWindow("b"),
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("c")));
if (bundleFinishedBeforeSnapshot) {
assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow("a"),
WindowedValue.valueInGlobalWindow("b"),
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("c"),
WindowedValue.valueInGlobalWindow("finishBundle")));
} else {
// Elements will be buffered as part of finishing a bundle in snapshot()
assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow("a"),
WindowedValue.valueInGlobalWindow("b"),
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("c")));
}

testHarness.close();

Expand Down Expand Up @@ -1209,38 +1222,65 @@ public void finishBundle(FinishBundleContext context) {
// check finishBundle by timeout
newHarness.setProcessingTime(10);

assertThat(
stripStreamRecordFromWindowedValue(newHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("d"),
WindowedValue.valueInGlobalWindow("finishBundle")));
if (bundleFinishedBeforeSnapshot) {
assertThat(
stripStreamRecordFromWindowedValue(newHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow("d"),
WindowedValue.valueInGlobalWindow("finishBundle")));
} else {
assertThat(
stripStreamRecordFromWindowedValue(newHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("d"),
WindowedValue.valueInGlobalWindow("finishBundle")));
}

// A final bundle will be created when sending the MAX watermark
newHarness.close();

assertThat(
stripStreamRecordFromWindowedValue(newHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("d"),
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("finishBundle")));
if (bundleFinishedBeforeSnapshot) {
assertThat(
stripStreamRecordFromWindowedValue(newHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow("d"),
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("finishBundle")));
} else {
assertThat(
stripStreamRecordFromWindowedValue(newHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("d"),
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("finishBundle")));
}

// close() will also call dispose(), but call again to verify no new bundle
// is created afterwards
newDoFnOperator.dispose();

assertThat(
stripStreamRecordFromWindowedValue(newHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("d"),
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("finishBundle")));
if (bundleFinishedBeforeSnapshot) {
assertThat(
stripStreamRecordFromWindowedValue(newHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow("d"),
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("finishBundle")));
} else {
assertThat(
stripStreamRecordFromWindowedValue(newHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("d"),
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("finishBundle")));
}
}

@Test
@SuppressWarnings("unchecked")
public void testBundleKeyed() throws Exception {

StringUtf8Coder keyCoder = StringUtf8Coder.of();
Expand Down Expand Up @@ -1303,17 +1343,29 @@ public void finishBundle(FinishBundleContext context) {
new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("key", "c"))));

// Take a snapshot
boolean bundleFinishedBeforeSnapshot = callPrepareSnapshotPreBarrier(doFnOperator);
OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);

// There is a finishBundle in snapshot()
// Elements will be buffered as part of finishing a bundle in snapshot()
assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow(KV.of("key", "a")),
WindowedValue.valueInGlobalWindow(KV.of("key", "b")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")),
WindowedValue.valueInGlobalWindow(KV.of("key", "c"))));
if (bundleFinishedBeforeSnapshot) {
assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow(KV.of("key", "a")),
WindowedValue.valueInGlobalWindow(KV.of("key", "b")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")),
WindowedValue.valueInGlobalWindow(KV.of("key", "c")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle"))));
} else {
// Elements will be buffered as part of finishing a bundle in snapshot()
assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow(KV.of("key", "a")),
WindowedValue.valueInGlobalWindow(KV.of("key", "b")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")),
WindowedValue.valueInGlobalWindow(KV.of("key", "c"))));
}

testHarness.close();

Expand Down Expand Up @@ -1351,13 +1403,21 @@ public void finishBundle(FinishBundleContext context) {
// check finishBundle by timeout
testHarness.setProcessingTime(10);

assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
// The first finishBundle is restored from the checkpoint
WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")),
WindowedValue.valueInGlobalWindow(KV.of("key", "d")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle"))));
if (bundleFinishedBeforeSnapshot) {
assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow(KV.of("key", "d")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle"))));
} else {
assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
// The first finishBundle is restored from the checkpoint
WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")),
WindowedValue.valueInGlobalWindow(KV.of("key", "d")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle"))));
}

testHarness.close();
}
Expand Down Expand Up @@ -1479,4 +1539,22 @@ private <T> WindowedValue<T> valueInWindow(T value, Instant timestamp, BoundedWi
private interface TestHarnessFactory<T> {
T create() throws Exception;
}

/**
* Flink version >=1.6 provides a hook for performing an action before the snapshot barrier is
* emitted to downstream operators. This avoids buffering elements emitted during finalizing the
* bundle in the snapshot method.
*/
private static boolean callPrepareSnapshotPreBarrier(DoFnOperator doFnOperator) throws Exception {
Method prepareSnapshotPreBarrier;
try {
prepareSnapshotPreBarrier =
doFnOperator.getClass().getMethod("prepareSnapshotPreBarrier", long.class);
prepareSnapshotPreBarrier.invoke(doFnOperator, 0L);
return true;
} catch (NoSuchMethodException e) {
// that's ok. not supported in this Flink version.
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,11 @@
<suppress id="ForbidNonVendoredGuava" files=".*bigtable.*BigtableServiceImplTest\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*sql.*BeamValuesRel\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*sql.*BeamEnumerableConverterTest\.java" />

<!-- Flink exceptions -->
<!-- These prevent "Missing/Duplicate package-info.java file" when using multiple source directories -->
<suppress checks="JavadocPackage" files=".*/DoFnOperator\.java" />
<suppress checks="JavadocPackage" files=".*/FlinkSplitStateInternals\.java" />
<suppress checks="JavadocPackage" files=".*/FlinkSplitStateInternalsTest\.java" />

</suppressions>