Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.sdk.runners.inprocess;

import org.apache.beam.sdk.util.SerializableUtils;

import com.google.common.base.Supplier;

import java.io.Serializable;

class CloningSupplier<T extends Serializable> implements Supplier<T> {
private final T value;

public static <ValueT extends Serializable> CloningSupplier<ValueT> forValue(ValueT value) {
return new CloningSupplier<>(value);
}

private CloningSupplier(T value) {
this.value = value;
}

@Override
public T get() {
return SerializableUtils.clone(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static com.google.common.base.Preconditions.checkNotNull;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers;
import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TransformWatermarks;
import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.runners.inprocess;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.inprocess.InProcessGroupByKey.InProcessGroupAlsoByWindow;
import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.SystemReduceFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;

import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;

import java.util.Collections;

/**
* The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
* {@link GroupByKeyOnly} {@link PTransform}.
*/
class InProcessGroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
@Override
public <InputT> TransformEvaluator<InputT> forApplication(
AppliedPTransform<?, ?, ?> application,
CommittedBundle<?> inputBundle,
InProcessEvaluationContext evaluationContext) {
@SuppressWarnings({"cast", "unchecked", "rawtypes"})
TransformEvaluator<InputT> evaluator =
createEvaluator(
(AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
return evaluator;
}

private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(
AppliedPTransform<
PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
InProcessGroupAlsoByWindow<K, V>>
application,
CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
InProcessEvaluationContext evaluationContext) {
return new InProcessGroupAlsoByWindowEvaluator<K, V>(
evaluationContext, inputBundle, application);
}

/**
* A transform evaluator for the pseudo-primitive {@link GroupAlsoByWindow}. Windowing is ignored;
* all input should be in the global window since all output will be as well.
*
* @see GroupByKeyViaGroupByKeyOnly
*/
private static class InProcessGroupAlsoByWindowEvaluator<K, V>
implements TransformEvaluator<KeyedWorkItem<K, V>> {

private final TransformEvaluator<KeyedWorkItem<K, V>> gabwParDoEvaluator;

private final AppliedPTransform<
PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
InProcessGroupAlsoByWindow<K, V>>
application;

public InProcessGroupAlsoByWindowEvaluator(
final InProcessEvaluationContext evaluationContext,
CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
final AppliedPTransform<
PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
InProcessGroupAlsoByWindow<K, V>>
application) {
this.application = application;

Coder<V> valueCoder =
application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder());

@SuppressWarnings("unchecked")
WindowingStrategy<?, BoundedWindow> windowingStrategy =
(WindowingStrategy<?, BoundedWindow>) application.getTransform().getWindowingStrategy();

DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn =
GroupAlsoByWindowViaWindowSetDoFn.create(
windowingStrategy,
SystemReduceFn.<K, V, BoundedWindow>buffering(valueCoder));

TupleTag<KV<K, Iterable<V>>> mainOutputTag = new TupleTag<KV<K, Iterable<V>>>() {};

// Not technically legit, as the application is not a ParDo
this.gabwParDoEvaluator =
ParDoInProcessEvaluator.create(
evaluationContext,
inputBundle,
application,
Suppliers.ofInstance(gabwDoFn),
Collections.<PCollectionView<?>>emptyList(),
mainOutputTag,
Collections.<TupleTag<?>>emptyList(),
ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
}

@Override
public void processElement(WindowedValue<KeyedWorkItem<K, V>> element) throws Exception {
gabwParDoEvaluator.processElement(element);
}

@Override
public InProcessTransformResult finishBundle() throws Exception {
return gabwParDoEvaluator.finishBundle();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.sdk.runners.inprocess;

import static com.google.common.base.Preconditions.checkArgument;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.KeyedWorkItemCoder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

class InProcessGroupByKey<K, V>
extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
private final GroupByKey<K, V> original;

InProcessGroupByKey(GroupByKey<K, V> from) {
this.original = from;
}

@Override
public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> delegate() {
return original;
}

@Override
public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
@SuppressWarnings("unchecked")
KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();

// This operation groups by the combination of key and window,
// merging windows as needed, using the windows assigned to the
// key/value input elements and the window merge operation of the
// window function associated with the input PCollection.
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();

// By default, implement GroupByKey via a series of lower-level operations.
return input
// Make each input element's timestamp and assigned windows
// explicit, in the value part.
.apply(new ReifyTimestampsAndWindows<K, V>())
.apply(new InProcessGroupByKeyOnly<K, V>())
.setCoder(
KeyedWorkItemCoder.of(
inputCoder.getKeyCoder(),
inputCoder.getValueCoder(),
input.getWindowingStrategy().getWindowFn().windowCoder()))

// Group each key's values by window, merging windows as needed.
.apply("GroupAlsoByWindow", new InProcessGroupAlsoByWindow<K, V>(windowingStrategy))

// And update the windowing strategy as appropriate.
.setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy))
.setCoder(
KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
}

static final class InProcessGroupByKeyOnly<K, V>
extends PTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>> {
@Override
public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, WindowedValue<V>>> input) {
return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal(
input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
}

InProcessGroupByKeyOnly() {}
}

static final class InProcessGroupAlsoByWindow<K, V>
extends PTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>> {

private final WindowingStrategy<?, ?> windowingStrategy;

public InProcessGroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
this.windowingStrategy = windowingStrategy;
}

public WindowingStrategy<?, ?> getWindowingStrategy() {
return windowingStrategy;
}

private KeyedWorkItemCoder<K, V> getKeyedWorkItemCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
// Coder<KV<...>> --> KvCoder<...>
checkArgument(
inputCoder instanceof KeyedWorkItemCoder,
"%s requires a %s<...> but got %s",
getClass().getSimpleName(),
KvCoder.class.getSimpleName(),
inputCoder);
@SuppressWarnings("unchecked")
KeyedWorkItemCoder<K, V> kvCoder = (KeyedWorkItemCoder<K, V>) inputCoder;
return kvCoder;
}

public Coder<K> getKeyCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
return getKeyedWorkItemCoder(inputCoder).getKeyCoder();
}

public Coder<V> getValueCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
return getKeyedWorkItemCoder(inputCoder).getElementCoder();
}

@Override
public PCollection<KV<K, Iterable<V>>> apply(PCollection<KeyedWorkItem<K, V>> input) {
return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
}
}
}
Loading