From 033b9240765543438068c1adea6d0cff34ddcd53 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 28 Mar 2016 11:31:38 +0200 Subject: [PATCH 1/2] [flink] improve lifecycle handling of GroupAlsoByWindowWrapper --- .../wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java index b413d7a595a4..751d44c0ec56 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java @@ -220,6 +220,7 @@ private FlinkGroupAlsoByWindowWrapper(PipelineOptions options, public void open() throws Exception { super.open(); this.context = new ProcessContext(operator, new TimestampedCollector<>(output), this.timerInternals); + operator.startBundle(context); } /** @@ -252,11 +253,7 @@ private DoFn, KV> creat private void processKeyedWorkItem(KeyedWorkItem workItem) throws Exception { context.setElement(workItem, getStateInternalsForKey(workItem.key())); - - // TODO: Ideally startBundle/finishBundle would be called when the operator is first used / about to be discarded. - operator.startBundle(context); operator.processElement(context); - operator.finishBundle(context); } @Override @@ -309,6 +306,7 @@ public void processWatermark(Watermark mark) throws Exception { @Override public void close() throws Exception { + operator.finishBundle(context); super.close(); } From 63a7c3d0cb51caf65dc82141671cf28d47c2be39 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Wed, 30 Mar 2016 12:02:01 +0200 Subject: [PATCH 2/2] [flink] improve readability of processElement function --- .../streaming/FlinkGroupAlsoByWindowWrapper.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java index 751d44c0ec56..3dc5a799f557 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java @@ -258,10 +258,18 @@ private void processKeyedWorkItem(KeyedWorkItem workItem) throws Excepti @Override public void processElement(StreamRecord>> element) throws Exception { - ArrayList> elements = new ArrayList<>(); - elements.add(WindowedValue.of(element.getValue().getValue().getValue(), element.getValue().getTimestamp(), - element.getValue().getWindows(), element.getValue().getPane())); - processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(), elements)); + final WindowedValue> windowedValue = element.getValue(); + final KV kv = windowedValue.getValue(); + + final WindowedValue updatedWindowedValue = WindowedValue.of(kv.getValue(), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPane()); + + processKeyedWorkItem( + KeyedWorkItems.elementsWorkItem( + kv.getKey(), + Collections.singletonList(updatedWindowedValue))); } @Override