Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;

import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

/**
* Gathers all panes of each window into exactly one output.
*
* <p>
* Note that this will delay the output of a window until the garbage collection time (when the
* watermark passes the end of the window plus allowed lateness) even if the upstream triggers
* closed the window earlier.
*/
public class GatherAllPanes<T>
extends PTransform<PCollection<T>, PCollection<Iterable<WindowedValue<T>>>> {
/**
* Gathers all panes of each window into a single output element.
*
* <p>
* This will gather all output panes into a single element, which causes them to be colocated on a
* single worker. As a result, this is only suitable for {@link PCollection PCollections} where
* all of the output elements for each pane fit in memory, such as in tests.
*/
public static <T> GatherAllPanes<T> globally() {
return new GatherAllPanes<>();
}

private GatherAllPanes() {}

@Override
public PCollection<Iterable<WindowedValue<T>>> apply(PCollection<T> input) {
WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();

return input
.apply(WithKeys.<Void, T>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
.apply(new ReifyTimestampsAndWindows<Void, T>())
.apply(
Window.into(
new IdentityWindowFn<KV<Void, WindowedValue<T>>>(
originalWindowFn.windowCoder(),
input.getWindowingStrategy().getWindowFn().assignsToSingleWindow()))
.triggering(Never.ever()))
// all values have the same key so they all appear as a single output element
.apply(GroupByKey.<Void, WindowedValue<T>>create())
.apply(Values.<Iterable<WindowedValue<T>>>create())
.setWindowingStrategyInternal(input.getWindowingStrategy());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> inp
}

/**
* Helper transform that makes timestamps and window assignments
* explicit in the value part of each key/value pair.
* Helper transform that makes timestamps and window assignments explicit in the value part of
* each key/value pair.
*/
public static class ReifyTimestampsAndWindows<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, WindowedValue<V>>>> {
Expand All @@ -137,7 +137,8 @@ public PCollection<KV<K, WindowedValue<V>>> apply(PCollection<KV<K, V>> input) {

// The requirement to use a KvCoder *is* actually a model-level requirement, not specific
// to this implementation of GBK. All runners need a way to get the key.
checkArgument(input.getCoder() instanceof KvCoder,
checkArgument(
input.getCoder() instanceof KvCoder,
"%s requires its input to use a %s",
GroupByKey.class.getSimpleName(),
KvCoder.class.getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;

import static org.junit.Assert.fail;

import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

import com.google.common.collect.Iterables;

import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.io.Serializable;

/**
* Tests for {@link GatherAllPanes}.
*/
@RunWith(JUnit4.class)
public class GatherAllPanesTest implements Serializable {
@Test
public void singlePaneSingleReifiedPane() {
TestPipeline p = TestPipeline.create();
PCollection<Iterable<WindowedValue<Iterable<Long>>>> accumulatedPanes =
p.apply(CountingInput.upTo(20000))
.apply(
WithTimestamps.of(
new SerializableFunction<Long, Instant>() {
@Override
public Instant apply(Long input) {
return new Instant(input * 10);
}
}))
.apply(
Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
.apply(GroupByKey.<Void, Long>create())
.apply(Values.<Iterable<Long>>create())
.apply(GatherAllPanes.<Iterable<Long>>globally());

PAssert.that(accumulatedPanes)
.satisfies(
new SerializableFunction<Iterable<Iterable<WindowedValue<Iterable<Long>>>>, Void>() {
@Override
public Void apply(Iterable<Iterable<WindowedValue<Iterable<Long>>>> input) {
for (Iterable<WindowedValue<Iterable<Long>>> windowedInput : input) {
if (Iterables.size(windowedInput) > 1) {
fail("Expected all windows to have exactly one pane, got " + windowedInput);
return null;
}
}
return null;
}
});
}

@Test
public void multiplePanesMultipleReifiedPane() {
TestPipeline p = TestPipeline.create();

PCollection<Iterable<WindowedValue<Iterable<Long>>>> accumulatedPanes =
p.apply(CountingInput.upTo(20000))
.apply(
WithTimestamps.of(
new SerializableFunction<Long, Instant>() {
@Override
public Instant apply(Long input) {
return new Instant(input * 10);
}
}))
.apply(
Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
.apply(GroupByKey.<Void, Long>create())
.apply(Values.<Iterable<Long>>create())
.apply(GatherAllPanes.<Iterable<Long>>globally());

PAssert.that(accumulatedPanes)
.satisfies(
new SerializableFunction<Iterable<Iterable<WindowedValue<Iterable<Long>>>>, Void>() {
@Override
public Void apply(Iterable<Iterable<WindowedValue<Iterable<Long>>>> input) {
for (Iterable<WindowedValue<Iterable<Long>>> windowedInput : input) {
if (Iterables.size(windowedInput) > 1) {
return null;
}
}
fail("Expected at least one window to have multiple panes");
return null;
}
});
}
}