Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,8 @@ private boolean fireTimers() throws Exception {
KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery);
@SuppressWarnings({"unchecked", "rawtypes"})
CommittedBundle<?> bundle =
evaluationContext
.createKeyedBundle(
null, keyTimers.getKey(), (PCollection) transform.getInput())
InProcessBundle.<KeyedWorkItem<Object, Object>>keyed(
(PCollection) transform.getInput(), keyTimers.getKey())
.add(WindowedValue.valueInEmptyWindows(work))
.commit(Instant.now());
scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ class InProcessEvaluationContext {
/** The options that were used to create this {@link Pipeline}. */
private final InProcessPipelineOptions options;

private final BundleFactory bundleFactory;
/** The current processing time and event time watermarks and timers. */
private final InMemoryWatermarkManager watermarkManager;

Expand All @@ -92,24 +91,21 @@ class InProcessEvaluationContext {

public static InProcessEvaluationContext create(
InProcessPipelineOptions options,
BundleFactory bundleFactory,
Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
Map<AppliedPTransform<?, ?, ?>, String> stepNames,
Collection<PCollectionView<?>> views) {
return new InProcessEvaluationContext(
options, bundleFactory, rootTransforms, valueToConsumers, stepNames, views);
options, rootTransforms, valueToConsumers, stepNames, views);
}

private InProcessEvaluationContext(
InProcessPipelineOptions options,
BundleFactory bundleFactory,
Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
Map<AppliedPTransform<?, ?, ?>, String> stepNames,
Collection<PCollectionView<?>> views) {
this.options = checkNotNull(options);
this.bundleFactory = checkNotNull(bundleFactory);
checkNotNull(rootTransforms);
checkNotNull(valueToConsumers);
checkNotNull(stepNames);
Expand Down Expand Up @@ -209,15 +205,17 @@ private void fireAvailableCallbacks(AppliedPTransform<?, ?, ?> producingTransfor
* Create a {@link UncommittedBundle} for use by a source.
*/
public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
return bundleFactory.createRootBundle(output);
return InProcessBundle.unkeyed(output);
}

/**
* Create a {@link UncommittedBundle} whose elements belong to the specified {@link
* PCollection}.
*/
public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
return bundleFactory.createBundle(input, output);
return input.isKeyed()
? InProcessBundle.keyed(output, input.getKey())
: InProcessBundle.unkeyed(output);
}

/**
Expand All @@ -226,7 +224,7 @@ public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollecti
*/
public <T> UncommittedBundle<T> createKeyedBundle(
CommittedBundle<?> input, Object key, PCollection<T> output) {
return bundleFactory.createKeyedBundle(input, key, output);
return InProcessBundle.keyed(output, key);
}

/**
Expand Down Expand Up @@ -355,9 +353,7 @@ public CounterSet getCounters() {
* for each time they are set.
*/
public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() {
Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired =
watermarkManager.extractFiredTimers();
return fired;
return watermarkManager.extractFiredTimers();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ public InProcessPipelineResult run(Pipeline pipeline) {
InProcessEvaluationContext context =
InProcessEvaluationContext.create(
getPipelineOptions(),
createBundleFactory(getPipelineOptions()),
consumerTrackingVisitor.getRootTransforms(),
consumerTrackingVisitor.getValueToConsumers(),
consumerTrackingVisitor.getStepNames(),
Expand Down Expand Up @@ -270,10 +269,6 @@ public InProcessPipelineResult run(Pipeline pipeline) {
return Collections.emptyMap();
}

private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) {
return InProcessBundleFactory.create();
}

/**
* The result of running a {@link Pipeline} with the {@link InProcessPipelineRunner}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ public class IntervalBoundedExponentialBackOff implements BackOff {
private final long initialIntervalMillis;
private int currentAttempt;

// BEAM-168: https://issues.apache.org/jira/browse/BEAM-168
@Deprecated
public IntervalBoundedExponentialBackOff(int maximumIntervalMillis, long initialIntervalMillis) {
this((long) maximumIntervalMillis, initialIntervalMillis);
}

public IntervalBoundedExponentialBackOff(long maximumIntervalMillis, long initialIntervalMillis) {
Preconditions.checkArgument(
maximumIntervalMillis > 0, "Maximum interval must be greater than zero.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,19 @@ private Collection<W> processElement(WindowedValue<InputT> value) throws Excepti

ReduceFn<K, InputT, OutputT, W>.Context directContext =
contextFactory.base(window, StateStyle.DIRECT);
W active = activeWindows.mergeResultWindow(window);
Preconditions.checkState(active != null, "Window %s has no mergeResultWindow", window);
if (triggerRunner.isClosed(directContext.state())) {
// This window has already been closed.
droppedDueToClosedWindow.addValue(1L);
WindowTracing.debug(
"ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
+ "since window is no longer active at inputWatermark:{}; outputWatermark:{}",
value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
continue;
}

W active = activeWindows.representative(window);
Preconditions.checkState(active != null, "Window %s has no representative", window);
windows.add(active);
}

Expand All @@ -456,24 +467,10 @@ private Collection<W> processElement(WindowedValue<InputT> value) throws Excepti
triggerRunner.prefetchForValue(window, directContext.state());
}

// Process the element for each (mergeResultWindow, not closed) window it belongs to.
List<W> triggerableWindows = new ArrayList<>(windows.size());
// Process the element for each (representative, not closed) window it belongs to.
for (W window : windows) {
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
if (triggerRunner.isClosed(directContext.state())) {
// This window has already been closed.
droppedDueToClosedWindow.addValue(1L);
WindowTracing.debug(
"ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
+ "since window is no longer active at inputWatermark:{}; outputWatermark:{}",
value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
continue;
}

triggerableWindows.add(window);
activeWindows.ensureWindowIsActive(window);
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue(
window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;

import org.hamcrest.Matchers;
Expand Down Expand Up @@ -817,10 +818,10 @@ public void processElement(ProcessContext c) throws Exception {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add("foo", "bar")
.add("foo2", DataflowPipelineTranslatorTest.class)
.withLabel("Test Class")
.withLinkUrl("http://www.google.com");
.add("foo", "bar")
.add("foo2", 123)
.withLabel("Test Value")
.withLinkUrl("http://www.google.com");
}
};

Expand All @@ -836,10 +837,12 @@ public void populateDisplayData(DisplayData.Builder builder) {
}
};

ParDo.Bound<Integer, Integer> parDo1 = ParDo.of(fn1);
ParDo.Bound<Integer, Integer> parDo2 = ParDo.of(fn2);
pipeline
.apply(Create.of(1, 2, 3))
.apply(ParDo.of(fn1))
.apply(ParDo.of(fn2));
.apply(parDo1)
.apply(parDo2);

Job job = translator.translate(
pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
Expand All @@ -856,34 +859,33 @@ public void populateDisplayData(DisplayData.Builder builder) {
Collection<Map<String, String>> fn2displayData =
(Collection<Map<String, String>>) parDo2Properties.get("display_data");

ImmutableList expectedFn1DisplayData = ImmutableList.of(
ImmutableMap.<String, String>builder()
.put("namespace", fn1.getClass().getName())
.put("key", "foo")
.put("type", "STRING")
.put("value", "bar")
.build(),
ImmutableMap.<String, String>builder()
.put("namespace", fn1.getClass().getName())
.put("key", "foo2")
.put("type", "JAVA_CLASS")
.put("value", DataflowPipelineTranslatorTest.class.getName())
.put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName())
.put("label", "Test Class")
.put("linkUrl", "http://www.google.com")
.build()
ImmutableSet<ImmutableMap<String, String>> expectedFn1DisplayData = ImmutableSet.of(
ImmutableMap.<String, String>builder()
.put("key", "foo")
.put("type", "STRING")
.put("value", "bar")
.put("namespace", fn1.getClass().getName())
.build(),
ImmutableMap.<String, String>builder()
.put("key", "foo2")
.put("type", "INTEGER")
.put("value", "123")
.put("namespace", fn1.getClass().getName())
.put("label", "Test Value")
.put("linkUrl", "http://www.google.com")
.build()
);

ImmutableList expectedFn2DisplayData = ImmutableList.of(
ImmutableMap.<String, String>builder()
.put("namespace", fn2.getClass().getName())
.put("key", "foo3")
.put("type", "STRING")
.put("value", "barge")
.build()
ImmutableSet<ImmutableMap<String, String>> expectedFn2DisplayData = ImmutableSet.of(
ImmutableMap.<String, String>builder()
.put("key", "foo3")
.put("type", "STRING")
.put("value", "barge")
.put("namespace", fn2.getClass().getName())
.build()
);

assertEquals(expectedFn1DisplayData, fn1displayData);
assertEquals(expectedFn2DisplayData, fn2displayData);
assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData));
assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -61,7 +60,6 @@ public class BoundedReadEvaluatorFactoryTest {
private PCollection<Long> longs;
private TransformEvaluatorFactory factory;
@Mock private InProcessEvaluationContext context;
private BundleFactory bundleFactory;

@Before
public void setup() {
Expand Down Expand Up @@ -170,7 +168,7 @@ public void boundedSourceEvaluatorClosesReader() throws Exception {
PCollection<Long> pcollection = p.apply(Read.from(source));
AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();

UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
when(context.createRootBundle(pcollection)).thenReturn(output);

TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
Expand All @@ -188,7 +186,7 @@ public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
PCollection<Long> pcollection = p.apply(Read.from(source));
AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();

UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
when(context.createRootBundle(pcollection)).thenReturn(output);

TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -42,6 +44,7 @@
/**
* Tests for {@link EncodabilityEnforcementFactory}.
*/
@RunWith(JUnit4.class)
public class EncodabilityEnforcementFactoryTest {
@Rule public ExpectedException thrown = ExpectedException.none();
private EncodabilityEnforcementFactory factory = EncodabilityEnforcementFactory.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ public void setup() {
context =
InProcessEvaluationContext.create(
runner.getPipelineOptions(),
InProcessBundleFactory.create(),
rootTransforms,
valueToConsumers,
cVis.getStepNames(),
Expand Down Expand Up @@ -157,9 +156,7 @@ public void getExecutionContextSameStepSameKeyState() {
stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);

context.handleResult(
InProcessBundleFactory.create()
.createKeyedBundle(null, "foo", created)
.commit(Instant.now()),
InProcessBundle.keyed(created, "foo").commit(Instant.now()),
ImmutableList.<TimerData>of(),
StepTransformResult.withoutHold(created.getProducingTransformInternal())
.withState(stepContext.commitState())
Expand Down Expand Up @@ -251,7 +248,7 @@ public void handleResultMergesCounters() {
.withCounters(againCounters)
.build();
context.handleResult(
context.createRootBundle(created).commit(Instant.now()),
InProcessBundle.unkeyed(created).commit(Instant.now()),
ImmutableList.<TimerData>of(),
secondResult);
assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L));
Expand All @@ -278,7 +275,7 @@ public void handleResultStoresState() {
.build();

context.handleResult(
context.createKeyedBundle(null, myKey, created).commit(Instant.now()),
InProcessBundle.keyed(created, myKey).commit(Instant.now()),
ImmutableList.<TimerData>of(),
stateResult);

Expand Down Expand Up @@ -360,7 +357,7 @@ public void extractFiredTimersExtractsTimers() {
// haven't added any timers, must be empty
assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
context.handleResult(
context.createKeyedBundle(null, key, created).commit(Instant.now()),
InProcessBundle.keyed(created, key).commit(Instant.now()),
ImmutableList.<TimerData>of(),
timerResult);

Expand Down
Loading