Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.ReadyCheckingSideInputReader;
import com.google.cloud.dataflow.sdk.util.SideInputReader;
import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
Expand Down Expand Up @@ -330,17 +331,6 @@ public ReadyCheckingSideInputReader createSideInputReader(
return sideInputContainer.createReaderForViews(sideInputs);
}

/**
* A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has
* had its contents set in a window.
*/
interface ReadyCheckingSideInputReader extends SideInputReader {
/**
* Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}.
*/
boolean isReady(PCollectionView<?> view, BoundedWindow window);
}

/**
* Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent
* of all other {@link CounterSet CounterSets} created by this call.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

import static com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessEvaluationContext.ReadyCheckingSideInputReader;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.PCollectionViewWindow;
import com.google.cloud.dataflow.sdk.util.ReadyCheckingSideInputReader;
import com.google.cloud.dataflow.sdk.util.SideInputReader;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (C) 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.values.PCollectionView;

/**
* A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has
* had its contents set in a window.
*/
public interface ReadyCheckingSideInputReader extends SideInputReader {
/**
* Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}.
*/
boolean isReady(PCollectionView<?> view, BoundedWindow window);
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessEvaluationContext.ReadyCheckingSideInputReader;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.Mean;
Expand All @@ -35,6 +34,7 @@
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo.Timing;
import com.google.cloud.dataflow.sdk.util.PCollectionViews;
import com.google.cloud.dataflow.sdk.util.ReadyCheckingSideInputReader;
import com.google.cloud.dataflow.sdk.util.SideInputReader;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;

import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
* A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
* them via the {@link #processElementInReadyWindows(WindowedValue)}.
*/
class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
private final DoFnRunner<InputT, OutputT> underlying;
private final Collection<PCollectionView<?>> views;
private final ReadyCheckingSideInputReader sideInputReader;

private Set<BoundedWindow> notReadyWindows;

public static <InputT, OutputT> PushbackSideInputDoFnRunner<InputT, OutputT> create(
DoFnRunner<InputT, OutputT> underlying,
Collection<PCollectionView<?>> views,
ReadyCheckingSideInputReader sideInputReader) {
return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader);
}

private PushbackSideInputDoFnRunner(
DoFnRunner<InputT, OutputT> underlying,
Collection<PCollectionView<?>> views,
ReadyCheckingSideInputReader sideInputReader) {
this.underlying = underlying;
this.views = views;
this.sideInputReader = sideInputReader;
}

@Override
public void startBundle() {
notReadyWindows = new HashSet<>();
underlying.startBundle();
}

/**
* Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element
* for each window the element is in that is ready.
*
* @param elem the element to process in all ready windows
* @return each element that could not be processed because it requires a side input window
* that is not ready.
*/
public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
if (views.isEmpty()) {
processElement(elem);
return Collections.emptyList();
}
ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
boolean isReady = !notReadyWindows.contains(mainInputWindow);
for (PCollectionView<?> view : views) {
BoundedWindow sideInputWindow =
view.getWindowingStrategyInternal()
.getWindowFn()
.getSideInputWindow(mainInputWindow);
if (!sideInputReader.isReady(view, sideInputWindow)) {
isReady = false;
break;
}
}
if (isReady) {
processElement(windowElem);
} else {
notReadyWindows.add(mainInputWindow);
pushedBack.add(windowElem);
}
}
return pushedBack.build();
}

@Override
public void processElement(WindowedValue<InputT> elem) {
underlying.processElement(elem);
}

/**
* Call the underlying {@link DoFnRunner#finishBundle()}.
*/
@Override
public void finishBundle() {
notReadyWindows = null;
underlying.finishBundle();
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn;

import java.util.Collection;

/**
* A {@link WindowFn} for use during tests that returns the input window for calls to
* {@link #getSideInputWindow(BoundedWindow)}.
*/
class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer, BoundedWindow> {
@Override
public Collection<BoundedWindow> assignWindows(WindowFn<Integer, BoundedWindow>.AssignContext c)
throws Exception {
return (Collection<BoundedWindow>) c.windows();
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return true;
}

@Override
public Coder<BoundedWindow> windowCoder() {
// not used
return (Coder) GlobalWindow.Coder.INSTANCE;
}

@Override
public BoundedWindow getSideInputWindow(BoundedWindow window) {
return window;
}
}
Loading