Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;

Expand Down Expand Up @@ -157,6 +159,12 @@ public static FlinkPipelineRunner createForTest(boolean streaming) {
@Override
public <Output extends POutput, Input extends PInput> Output apply(
PTransform<Input, Output> transform, Input input) {

// In batch mode, expand GroupByKey to GroupByKeyOnly -> GroupAlsoByWindow
if (!options.isStreaming() && transform.getClass().equals(GroupByKey.class)) {
return (Output) super.apply(new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform), input);
}

return super.apply(transform, input);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
Expand All @@ -52,6 +51,12 @@
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
Expand All @@ -78,12 +83,14 @@
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.core.fs.Path;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -114,8 +121,9 @@ public class FlinkBatchTransformTranslators {

TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch());

// TODO we're currently ignoring windows here but that has to change in the future
TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
TRANSLATORS.put(GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly.class, new GroupByKeyOnlyTranslatorBatch());

TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslatorBatch());

TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiTranslatorBatch());
TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslatorBatch());
Expand Down Expand Up @@ -303,13 +311,64 @@ public void translateNode(Write.Bound<T> transform, FlinkBatchTranslationContext
}
}

public static class WindowBoundTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Bound<T>> {

@Override
public void translateNode(Window.Bound<T> transform, FlinkBatchTranslationContext context) {
PValue input = context.getInput(transform);
DataSet<T> inputDataSet = context.getInputDataSet(input);

@SuppressWarnings("unchecked")
final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy =
(WindowingStrategy<T, ? extends BoundedWindow>)
context.getOutput(transform).getWindowingStrategy();

final WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
FlinkDoFnFunction<T, T> doFnWrapper = new FlinkDoFnFunction<>(createWindowAssigner(windowFn), context.getPipelineOptions());

TypeInformation<T> typeInformation = context.getTypeInfo(context.getOutput(transform));
MapPartitionOperator<T, T> outputDataSet = new MapPartitionOperator<T, T>(inputDataSet, typeInformation, doFnWrapper, transform.getName());

context.setOutputDataSet(context.getOutput(transform), outputDataSet);
}

private static <T, W extends BoundedWindow> DoFn<T, T> createWindowAssigner(final WindowFn<T, W> windowFn) {
return new DoFn<T, T>() {

@Override
public void processElement(final ProcessContext c) throws Exception {
Collection<W> windows = windowFn.assignWindows(
windowFn.new AssignContext() {
@Override
public T element() {
return c.element();
}

@Override
public Instant timestamp() {
return c.timestamp();
}

@Override
public Collection<? extends BoundedWindow> windows() {
return c.windowingInternals().windows();
}
});

c.windowingInternals().outputWindowedValue(
c.element(), c.timestamp(), windows, c.pane());
}
};
}
}

/**
* Translates a GroupByKey while ignoring window assignments. Current ignores windows.
* Translates a {@link GroupByKeyOnly}, which ignores window assignments.
*/
private static class GroupByKeyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, V>> {
private static class GroupByKeyOnlyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKeyOnly<K, V>> {

@Override
public void translateNode(GroupByKey<K, V> transform, FlinkBatchTranslationContext context) {
public void translateNode(GroupByKeyOnly<K, V> transform, FlinkBatchTranslationContext context) {
DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform));
GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>();

Expand Down