Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down Expand Up @@ -333,16 +334,6 @@ public ReadyCheckingSideInputReader createSideInputReader(
return sideInputContainer.createReaderForViews(sideInputs);
}

/**
* A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has
* had its contents set in a window.
*/
static interface ReadyCheckingSideInputReader extends SideInputReader {
/**
* Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}.
*/
boolean isReady(PCollectionView<?> view, BoundedWindow window);
}

/**
* Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

import static com.google.common.base.Preconditions.checkArgument;

import org.apache.beam.runners.direct.InProcessEvaluationContext.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.PCollectionViewWindow;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.doAnswer;

import org.apache.beam.runners.direct.InProcessEvaluationContext.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
Expand All @@ -37,6 +36,7 @@
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;

import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
* A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
* them via the {@link #processElementInReadyWindows(WindowedValue)}.
*/
class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
private final DoFnRunner<InputT, OutputT> underlying;
private final Collection<PCollectionView<?>> views;
private final ReadyCheckingSideInputReader sideInputReader;

private Set<BoundedWindow> notReadyWindows;

public static <InputT, OutputT> PushbackSideInputDoFnRunner<InputT, OutputT> create(
DoFnRunner<InputT, OutputT> underlying,
Collection<PCollectionView<?>> views,
ReadyCheckingSideInputReader sideInputReader) {
return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader);
}

private PushbackSideInputDoFnRunner(
DoFnRunner<InputT, OutputT> underlying,
Collection<PCollectionView<?>> views,
ReadyCheckingSideInputReader sideInputReader) {
this.underlying = underlying;
this.views = views;
this.sideInputReader = sideInputReader;
}

@Override
public void startBundle() {
notReadyWindows = new HashSet<>();
underlying.startBundle();
}

/**
* Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element
* for each window the element is in that is ready.
*
* @param elem the element to process in all ready windows
* @return each element that could not be processed because it requires a side input window
* that is not ready.
*/
public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
if (views.isEmpty()) {
processElement(elem);
return Collections.emptyList();
}
ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
Copy link
Member

@kennknowles kennknowles May 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to explode windows in this class, or can it be separated out?

I suppose the code here won't really change much, since you have to access the contents of the WindowedValue anyhow. But the helper method could be simplified.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is required to return only the (value, window)s that could not be processed, instead of The (value, window*)s that contain a window that could not be processed. This lets us do as much work as possible

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I meant somewhat the opposite. I meant to suggest that you explode values prior to reaching this point. But since the type would still be WindowedValue it wouldn't really buy you much.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally it's left up to the DoFnRunner to decide if it should explode windows or not (to not make redundant calls if possible), or at least that's how the existing FnRunners work

BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
boolean isReady = !notReadyWindows.contains(mainInputWindow);
for (PCollectionView<?> view : views) {
BoundedWindow sideInputWindow =
view.getWindowingStrategyInternal()
.getWindowFn()
.getSideInputWindow(mainInputWindow);
if (!sideInputReader.isReady(view, sideInputWindow)) {
isReady = false;
break;
}
}
if (isReady) {
processElement(windowElem);
} else {
notReadyWindows.add(mainInputWindow);
pushedBack.add(windowElem);
}
}
return pushedBack.build();
}

@Override
public void processElement(WindowedValue<InputT> elem) {
underlying.processElement(elem);
}

/**
* Call the underlying {@link DoFnRunner#finishBundle()}.
*/
@Override
public void finishBundle() {
notReadyWindows = null;
underlying.finishBundle();
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.sdk.util;

import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;

/**
* A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has
* had its contents set in a window.
*/
public interface ReadyCheckingSideInputReader extends SideInputReader {
/**
* Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}.
*/
boolean isReady(PCollectionView<?> view, BoundedWindow window);
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn;

import java.util.Collection;

/**
* A {@link WindowFn} for use during tests that returns the input window for calls to
* {@link #getSideInputWindow(BoundedWindow)}.
*/
class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer, BoundedWindow> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@Override
public Collection<BoundedWindow> assignWindows(WindowFn<Integer, BoundedWindow>.AssignContext c)
throws Exception {
return (Collection<BoundedWindow>) c.windows();
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return true;
}

@Override
public Coder<BoundedWindow> windowCoder() {
// not used
return (Coder) GlobalWindow.Coder.INSTANCE;
}

@Override
public BoundedWindow getSideInputWindow(BoundedWindow window) {
return window;
}
}
Loading