Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer;
import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer;
import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Context;

Expand Down Expand Up @@ -225,7 +225,7 @@ public interface PipelineOptions {
@Description("The pipeline runner that will be used to execute the pipeline. "
+ "For registered runners, the class name can be specified, otherwise the fully "
+ "qualified name needs to be specified.")
@Default.Class(DirectPipelineRunner.class)
@Default.Class(InProcessPipelineRunner.class)
Class<? extends PipelineRunner<?>> getRunner();
void setRunner(Class<? extends PipelineRunner<?>> kls);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.beam.sdk.util.MutationDetector;
import org.apache.beam.sdk.util.MutationDetectors;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;

Expand Down Expand Up @@ -113,17 +112,16 @@ public CommittedBundle<T> commit(Instant synchronizedProcessingTime) {
try {
detector.verifyUnmodified();
} catch (IllegalMutationException exn) {
throw UserCodeException.wrap(
new IllegalMutationException(
String.format(
"PTransform %s mutated value %s after it was output (new value was %s)."
+ " Values must not be mutated in any way after being output.",
underlying.getPCollection().getProducingTransformInternal().getFullName(),
exn.getSavedValue(),
exn.getNewValue()),
throw new IllegalMutationException(
String.format(
"PTransform %s mutated value %s after it was output (new value was %s)."
+ " Values must not be mutated in any way after being output.",
underlying.getPCollection().getProducingTransformInternal().getFullName(),
exn.getSavedValue(),
exn.getNewValue(),
exn));
exn.getNewValue()),
exn.getSavedValue(),
exn.getNewValue(),
exn);
}
}
return underlying.commit(synchronizedProcessingTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.RestoreSystemProperties;

Expand Down Expand Up @@ -60,7 +61,7 @@
@RunWith(JUnit4.class)
public class PipelineOptionsFactoryTest {
private static final Class<? extends PipelineRunner<?>> DEFAULT_RUNNER_CLASS =
DirectPipelineRunner.class;
InProcessPipelineRunner.class;

@Rule public ExpectedException expectedException = ExpectedException.none();
@Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -87,7 +87,7 @@ public void testDynamicAs() {

@Test
public void testDefaultRunnerIsSet() {
assertEquals(DirectPipelineRunner.class, PipelineOptionsFactory.create().getRunner());
assertEquals(InProcessPipelineRunner.class, PipelineOptionsFactory.create().getRunner());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,45 +128,46 @@ public void testCompositeCapture() throws Exception {
final EnumSet<TransformsSeen> left =
EnumSet.noneOf(TransformsSeen.class);

p.traverseTopologically(new Pipeline.PipelineVisitor() {
@Override
public void enterCompositeTransform(TransformTreeNode node) {
PTransform<?, ?> transform = node.getTransform();
if (transform instanceof Sample.SampleAny) {
assertTrue(visited.add(TransformsSeen.SAMPLE_ANY));
assertNotNull(node.getEnclosingNode());
assertTrue(node.isCompositeNode());
} else if (transform instanceof Write.Bound) {
assertTrue(visited.add(TransformsSeen.WRITE));
assertNotNull(node.getEnclosingNode());
assertTrue(node.isCompositeNode());
}
assertThat(transform, not(instanceOf(Read.Bounded.class)));
}

@Override
public void leaveCompositeTransform(TransformTreeNode node) {
PTransform<?, ?> transform = node.getTransform();
if (transform instanceof Sample.SampleAny) {
assertTrue(left.add(TransformsSeen.SAMPLE_ANY));
}
}

@Override
public void visitTransform(TransformTreeNode node) {
PTransform<?, ?> transform = node.getTransform();
// Pick is a composite, should not be visited here.
assertThat(transform, not(instanceOf(Sample.SampleAny.class)));
assertThat(transform, not(instanceOf(Write.Bound.class)));
if (transform instanceof Read.Bounded) {
assertTrue(visited.add(TransformsSeen.READ));
}
}

@Override
public void visitValue(PValue value, TransformTreeNode producer) {
}
});
p.traverseTopologically(
new Pipeline.PipelineVisitor() {
@Override
public void enterCompositeTransform(TransformTreeNode node) {
PTransform<?, ?> transform = node.getTransform();
if (transform instanceof Sample.SampleAny) {
assertTrue(visited.add(TransformsSeen.SAMPLE_ANY));
assertNotNull(node.getEnclosingNode());
assertTrue(node.isCompositeNode());
} else if (transform instanceof Write.Bound) {
assertTrue(visited.add(TransformsSeen.WRITE));
assertNotNull(node.getEnclosingNode());
assertTrue(node.isCompositeNode());
}
assertThat(transform, not(instanceOf(Read.Bounded.class)));
}

@Override
public void leaveCompositeTransform(TransformTreeNode node) {
PTransform<?, ?> transform = node.getTransform();
if (transform instanceof Sample.SampleAny) {
assertTrue(left.add(TransformsSeen.SAMPLE_ANY));
}
}

@Override
public void visitTransform(TransformTreeNode node) {
PTransform<?, ?> transform = node.getTransform();
// Pick is a composite, should not be visited here.
assertThat(transform, not(instanceOf(Sample.SampleAny.class)));
assertThat(transform, not(instanceOf(Write.Bound.class)));
if (transform instanceof Read.Bounded
&& node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
assertTrue(visited.add(TransformsSeen.READ));
}
}

@Override
public void visitValue(PValue value, TransformTreeNode producer) {}
});

assertTrue(visited.equals(EnumSet.allOf(TransformsSeen.class)));
assertTrue(left.equals(EnumSet.of(TransformsSeen.SAMPLE_ANY)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class EncodabilityEnforcementFactoryTest {
public void encodeFailsThrows() {
TestPipeline p = TestPipeline.create();
PCollection<Record> unencodable =
p.apply(Create.of(new Record()).withCoder(new RecordNoEncodeCoder()));
p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder()));
AppliedPTransform<?, ?, ?> consumer =
unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.beam.sdk.runners.inprocess;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertThat;

import org.apache.beam.sdk.coders.ByteArrayCoder;
Expand All @@ -31,7 +30,6 @@
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.IllegalMutationException;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;

Expand Down Expand Up @@ -163,10 +161,9 @@ public void mutationAfterAddRootBundleThrows() {
root.add(WindowedValue.valueInGlobalWindow(array));

array[1] = 2;
thrown.expect(UserCodeException.class);
thrown.expectCause(isA(IllegalMutationException.class));
thrown.expect(IllegalMutationException.class);
thrown.expectMessage("Values must not be mutated in any way after being output");
CommittedBundle<byte[]> committed = root.commit(Instant.now());
root.commit(Instant.now());
}

@Test
Expand All @@ -184,10 +181,9 @@ public void mutationAfterAddKeyedBundleThrows() {
keyed.add(windowedArray);

array[0] = Byte.MAX_VALUE;
thrown.expect(UserCodeException.class);
thrown.expectCause(isA(IllegalMutationException.class));
thrown.expect(IllegalMutationException.class);
thrown.expectMessage("Values must not be mutated in any way after being output");
CommittedBundle<byte[]> committed = keyed.commit(Instant.now());
keyed.commit(Instant.now());
}

@Test
Expand All @@ -205,10 +201,9 @@ public void mutationAfterAddCreateBundleThrows() {
intermediate.add(windowedArray);

array[2] = -3;
thrown.expect(UserCodeException.class);
thrown.expectCause(isA(IllegalMutationException.class));
thrown.expect(IllegalMutationException.class);
thrown.expectMessage("Values must not be mutated in any way after being output");
CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
intermediate.commit(Instant.now());
}

private static class IdentityDoFn<T> extends DoFn<T, T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.isA;
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
import static org.hamcrest.core.AnyOf.anyOf;
Expand All @@ -36,7 +35,6 @@
import static org.junit.Assert.assertThat;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.ListCoder;
Expand Down Expand Up @@ -1119,7 +1117,7 @@ public void testSideOutputUnknownCoder() throws Exception {
input.apply(ParDo.of(new SideOutputDummyFn(sideOutputTag))
.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));

thrown.expect(PipelineExecutionException.class);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Unable to return a default Coder");
pipeline.run();
}
Expand Down Expand Up @@ -1422,8 +1420,7 @@ public void testMutatingOutputThenOutputDoFnError() throws Exception {
}
}));

thrown.expect(PipelineExecutionException.class);
thrown.expectCause(isA(IllegalMutationException.class));
thrown.expect(IllegalMutationException.class);
thrown.expectMessage("output");
thrown.expectMessage("must not be mutated");
pipeline.run();
Expand Down Expand Up @@ -1472,8 +1469,7 @@ public void testMutatingOutputCoderDoFnError() throws Exception {
}
}));

thrown.expect(PipelineExecutionException.class);
thrown.expectCause(isA(IllegalMutationException.class));
thrown.expect(IllegalMutationException.class);
thrown.expectMessage("output");
thrown.expectMessage("must not be mutated");
pipeline.run();
Expand All @@ -1499,7 +1495,7 @@ public void testMutatingInputDoFnError() throws Exception {
}));

thrown.expect(IllegalMutationException.class);
thrown.expectMessage("input");
thrown.expectMessage("Input");
thrown.expectMessage("must not be mutated");
pipeline.run();
}
Expand All @@ -1523,7 +1519,7 @@ public void testMutatingInputCoderDoFnError() throws Exception {
}));

thrown.expect(IllegalMutationException.class);
thrown.expectMessage("input");
thrown.expectMessage("Input");
thrown.expectMessage("must not be mutated");
pipeline.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.transforms;

import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down Expand Up @@ -65,7 +64,7 @@ public void withLambdaAndNoTypeDescriptorShouldThrow() {

values.apply("ApplyKeysWithWithKeys", WithKeys.of((String s) -> Integer.valueOf(s)));

thrown.expect(PipelineExecutionException.class);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Unable to return a default Coder for ApplyKeysWithWithKeys");
thrown.expectMessage("Cannot provide a coder for type variable K");
thrown.expectMessage("the actual type is unknown due to erasure.");
Expand Down