} containing all inputs.
+ *
+ * For internal use to translate {@link GroupByKey}. For a large {@link PCollection} this
+ * is expected to crash!
+ *
+ *
This is copied from the dataflow runner code.
+ *
+ * @param the type of elements to concatenate.
*/
- private static class GroupByKeyTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator> {
+ private static class Concatenate extends Combine.CombineFn, List> {
+ @Override
+ public List createAccumulator() {
+ return new ArrayList();
+ }
@Override
- public void translateNode(GroupByKey transform, FlinkBatchTranslationContext context) {
- DataSet> inputDataSet = context.getInputDataSet(context.getInput(transform));
- GroupReduceFunction, KV>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>();
+ public List addInput(List accumulator, T input) {
+ accumulator.add(input);
+ return accumulator;
+ }
- TypeInformation>> typeInformation = context.getTypeInfo(context.getOutput(transform));
+ @Override
+ public List mergeAccumulators(Iterable> accumulators) {
+ List result = createAccumulator();
+ for (List accumulator : accumulators) {
+ result.addAll(accumulator);
+ }
+ return result;
+ }
- Grouping> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType()));
+ @Override
+ public List extractOutput(List accumulator) {
+ return accumulator;
+ }
- GroupReduceOperator, KV>> outputDataSet =
- new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName());
+ @Override
+ public Coder> getAccumulatorCoder(CoderRegistry registry, Coder inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
- context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+ @Override
+ public Coder> getDefaultOutputCoder(CoderRegistry registry, Coder inputCoder) {
+ return ListCoder.of(inputCoder);
}
}
- private static class CombinePerKeyTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator> {
+
+ private static class CombinePerKeyTranslatorBatch
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+ Combine.PerKey> {
@Override
- public void translateNode(Combine.PerKey transform, FlinkBatchTranslationContext context) {
- DataSet> inputDataSet = context.getInputDataSet(context.getInput(transform));
+ @SuppressWarnings("unchecked")
+ public void translateNode(
+ Combine.PerKey transform,
+ FlinkBatchTranslationContext context) {
+ DataSet>> inputDataSet =
+ context.getInputDataSet(context.getInput(transform));
- @SuppressWarnings("unchecked")
- Combine.KeyedCombineFn keyedCombineFn = (Combine.KeyedCombineFn) transform.getFn();
+ CombineFnBase.PerKeyCombineFn combineFn =
+ (CombineFnBase.PerKeyCombineFn) transform.getFn();
- KvCoder inputCoder = (KvCoder) context.getInput(transform).getCoder();
+ KvCoder inputCoder = (KvCoder) context.getInput(transform).getCoder();
+
+ Coder accumulatorCoder;
- Coder accumulatorCoder =
- null;
try {
- accumulatorCoder = keyedCombineFn.getAccumulatorCoder(context.getInput(transform).getPipeline().getCoderRegistry(), inputCoder.getKeyCoder(), inputCoder.getValueCoder());
+ accumulatorCoder =
+ combineFn.getAccumulatorCoder(
+ context.getInput(transform).getPipeline().getCoderRegistry(),
+ inputCoder.getKeyCoder(),
+ inputCoder.getValueCoder());
} catch (CannotProvideCoderException e) {
- e.printStackTrace();
- // TODO
+ throw new RuntimeException(e);
}
- TypeInformation> kvCoderTypeInformation = new KvCoderTypeInformation<>(inputCoder);
- TypeInformation> partialReduceTypeInfo = new KvCoderTypeInformation<>(KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder));
+ WindowingStrategy, ?> windowingStrategy =
+ context.getInput(transform).getWindowingStrategy();
+
+ TypeInformation>> kvCoderTypeInformation =
+ new KvCoderTypeInformation<>(
+ WindowedValue.getFullCoder(
+ inputCoder,
+ windowingStrategy.getWindowFn().windowCoder()));
+
+ TypeInformation>> partialReduceTypeInfo =
+ new KvCoderTypeInformation<>(
+ WindowedValue.getFullCoder(
+ KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
+ windowingStrategy.getWindowFn().windowCoder()));
+
+ Grouping>> inputGrouping =
+ new UnsortedGrouping<>(
+ inputDataSet,
+ new Keys.ExpressionKeys<>(new String[]{"key"},
+ kvCoderTypeInformation));
+
+ // construct a map from side input to WindowingStrategy so that
+ // the DoFn runner can map main-input windows to side input windows
+ Map, WindowingStrategy, ?>> sideInputStrategies = new HashMap<>();
+ for (PCollectionView> sideInput: transform.getSideInputs()) {
+ sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
+ }
- Grouping> inputGrouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, kvCoderTypeInformation));
+ if (windowingStrategy.getWindowFn().isNonMerging()) {
+ WindowingStrategy, BoundedWindow> boundedStrategy =
+ (WindowingStrategy, BoundedWindow>) windowingStrategy;
+
+ FlinkPartialReduceFunction partialReduceFunction =
+ new FlinkPartialReduceFunction<>(
+ combineFn,
+ boundedStrategy,
+ sideInputStrategies,
+ context.getPipelineOptions());
+
+ FlinkReduceFunction reduceFunction =
+ new FlinkReduceFunction<>(
+ combineFn,
+ boundedStrategy,
+ sideInputStrategies,
+ context.getPipelineOptions());
+
+ // Partially GroupReduce the values into the intermediate format AccumT (combine)
+ GroupCombineOperator<
+ WindowedValue>,
+ WindowedValue>> groupCombine =
+ new GroupCombineOperator<>(
+ inputGrouping,
+ partialReduceTypeInfo,
+ partialReduceFunction,
+ "GroupCombine: " + transform.getName());
+
+ transformSideInputs(transform.getSideInputs(), groupCombine, context);
+
+ TypeInformation>> reduceTypeInfo =
+ context.getTypeInfo(context.getOutput(transform));
+
+ Grouping>> intermediateGrouping =
+ new UnsortedGrouping<>(
+ groupCombine,
+ new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType()));
+
+ // Fully reduce the values and create output format OutputT
+ GroupReduceOperator<
+ WindowedValue>, WindowedValue>> outputDataSet =
+ new GroupReduceOperator<>(
+ intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName());
+
+ transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+
+ context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+
+ } else {
+ if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
+ throw new UnsupportedOperationException(
+ "Merging WindowFn with windows other than IntervalWindow are not supported.");
+ }
- FlinkPartialReduceFunction partialReduceFunction = new FlinkPartialReduceFunction<>(keyedCombineFn);
+ // for merging windows we can't to a pre-shuffle combine step since
+ // elements would not be in their correct windows for side-input access
- // Partially GroupReduce the values into the intermediate format VA (combine)
- GroupCombineOperator, KV> groupCombine =
- new GroupCombineOperator<>(inputGrouping, partialReduceTypeInfo, partialReduceFunction,
- "GroupCombine: " + transform.getName());
+ WindowingStrategy, IntervalWindow> intervalStrategy =
+ (WindowingStrategy, IntervalWindow>) windowingStrategy;
- // Reduce fully to VO
- GroupReduceFunction, KV> reduceFunction = new FlinkReduceFunction<>(keyedCombineFn);
+ FlinkMergingNonShuffleReduceFunction reduceFunction =
+ new FlinkMergingNonShuffleReduceFunction<>(
+ combineFn,
+ intervalStrategy,
+ sideInputStrategies,
+ context.getPipelineOptions());
- TypeInformation> reduceTypeInfo = context.getTypeInfo(context.getOutput(transform));
+ TypeInformation>> reduceTypeInfo =
+ context.getTypeInfo(context.getOutput(transform));
- Grouping> intermediateGrouping = new UnsortedGrouping<>(groupCombine, new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType()));
+ Grouping>> grouping =
+ new UnsortedGrouping<>(
+ inputDataSet,
+ new Keys.ExpressionKeys<>(new String[]{"key"}, kvCoderTypeInformation));
+
+ // Fully reduce the values and create output format OutputT
+ GroupReduceOperator<
+ WindowedValue>, WindowedValue>> outputDataSet =
+ new GroupReduceOperator<>(
+ grouping, reduceTypeInfo, reduceFunction, transform.getName());
+
+ transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+
+ context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+ }
- // Fully reduce the values and create output format VO
- GroupReduceOperator, KV> outputDataSet =
- new GroupReduceOperator<>(intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName());
- context.setOutputDataSet(context.getOutput(transform), outputDataSet);
}
}
-// private static class CombineGroupedValuesTranslator implements FlinkPipelineTranslator.TransformTranslator> {
-//
-// @Override
-// public void translateNode(Combine.GroupedValues transform, TranslationContext context) {
-// DataSet> inputDataSet = context.getInputDataSet(transform.getInput());
-//
-// Combine.KeyedCombineFn super K, ? super VI, ?, VO> keyedCombineFn = transform.getFn();
-//
-// GroupReduceFunction, KV> groupReduceFunction = new FlinkCombineFunction<>(keyedCombineFn);
-//
-// TypeInformation> typeInformation = context.getTypeInfo(transform.getOutput());
-//
-// Grouping> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{""}, inputDataSet.getType()));
-//
-// GroupReduceOperator, KV> outputDataSet =
-// new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName());
-// context.setOutputDataSet(transform.getOutput(), outputDataSet);
-// }
-// }
-
- private static class ParDoBoundTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator> {
- private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorBatch.class);
+ private static class ParDoBoundTranslatorBatch
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+ ParDo.Bound> {
@Override
- public void translateNode(ParDo.Bound transform, FlinkBatchTranslationContext context) {
- DataSet inputDataSet = context.getInputDataSet(context.getInput(transform));
+ public void translateNode(
+ ParDo.Bound transform,
+ FlinkBatchTranslationContext context) {
+ DataSet> inputDataSet =
+ context.getInputDataSet(context.getInput(transform));
- final DoFn doFn = transform.getFn();
+ final DoFn doFn = transform.getFn();
- TypeInformation typeInformation = context.getTypeInfo(context.getOutput(transform));
+ TypeInformation> typeInformation =
+ context.getTypeInfo(context.getOutput(transform));
- FlinkDoFnFunction doFnWrapper = new FlinkDoFnFunction<>(doFn, context.getPipelineOptions());
- MapPartitionOperator outputDataSet = new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, transform.getName());
+ List> sideInputs = transform.getSideInputs();
- transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+ // construct a map from side input to WindowingStrategy so that
+ // the DoFn runner can map main-input windows to side input windows
+ Map, WindowingStrategy, ?>> sideInputStrategies = new HashMap<>();
+ for (PCollectionView> sideInput: sideInputs) {
+ sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
+ }
+
+ FlinkDoFnFunction doFnWrapper =
+ new FlinkDoFnFunction<>(
+ doFn,
+ context.getOutput(transform).getWindowingStrategy(),
+ sideInputStrategies,
+ context.getPipelineOptions());
+
+ MapPartitionOperator, WindowedValue> outputDataSet =
+ new MapPartitionOperator<>(
+ inputDataSet,
+ typeInformation,
+ doFnWrapper,
+ transform.getName());
+
+ transformSideInputs(sideInputs, outputDataSet, context);
context.setOutputDataSet(context.getOutput(transform), outputDataSet);
}
}
- private static class ParDoBoundMultiTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator> {
- private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslatorBatch.class);
+ private static class ParDoBoundMultiTranslatorBatch
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+ ParDo.BoundMulti> {
@Override
- public void translateNode(ParDo.BoundMulti transform, FlinkBatchTranslationContext context) {
- DataSet inputDataSet = context.getInputDataSet(context.getInput(transform));
+ public void translateNode(
+ ParDo.BoundMulti transform,
+ FlinkBatchTranslationContext context) {
+ DataSet> inputDataSet =
+ context.getInputDataSet(context.getInput(transform));
- final DoFn doFn = transform.getFn();
+ final DoFn doFn = transform.getFn();
Map, PCollection>> outputs = context.getOutput(transform).getAll();
Map, Integer> outputMap = Maps.newHashMap();
- // put the main output at index 0, FlinkMultiOutputDoFnFunction also expects this
+ // put the main output at index 0, FlinkMultiOutputDoFnFunction expects this
outputMap.put(transform.getMainOutputTag(), 0);
int count = 1;
for (TupleTag> tag: outputs.keySet()) {
@@ -432,147 +683,166 @@ public void translateNode(ParDo.BoundMulti transform, FlinkBatchTransla
}
}
+ // assume that the windowing strategy is the same for all outputs
+ WindowingStrategy, ?> windowingStrategy = null;
+
// collect all output Coders and create a UnionCoder for our tagged outputs
List> outputCoders = Lists.newArrayList();
for (PCollection> coll: outputs.values()) {
outputCoders.add(coll.getCoder());
+ windowingStrategy = coll.getWindowingStrategy();
+ }
+
+ if (windowingStrategy == null) {
+ throw new IllegalStateException("No outputs defined.");
}
UnionCoder unionCoder = UnionCoder.of(outputCoders);
- @SuppressWarnings("unchecked")
- TypeInformation typeInformation = new CoderTypeInformation<>(unionCoder);
+ TypeInformation> typeInformation =
+ new CoderTypeInformation<>(
+ WindowedValue.getFullCoder(
+ unionCoder,
+ windowingStrategy.getWindowFn().windowCoder()));
- @SuppressWarnings("unchecked")
- FlinkMultiOutputDoFnFunction doFnWrapper = new FlinkMultiOutputDoFnFunction(doFn, context.getPipelineOptions(), outputMap);
- MapPartitionOperator outputDataSet = new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, transform.getName());
+ List> sideInputs = transform.getSideInputs();
- transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+ // construct a map from side input to WindowingStrategy so that
+ // the DoFn runner can map main-input windows to side input windows
+ Map, WindowingStrategy, ?>> sideInputStrategies = new HashMap<>();
+ for (PCollectionView> sideInput: sideInputs) {
+ sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
+ }
- for (Map.Entry, PCollection>> output: outputs.entrySet()) {
- TypeInformation