diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 17cf5b38d1a4..d87e39660abb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -21,8 +21,8 @@ import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer; import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer; import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.Context; @@ -225,7 +225,7 @@ public interface PipelineOptions { @Description("The pipeline runner that will be used to execute the pipeline. " + "For registered runners, the class name can be specified, otherwise the fully " + "qualified name needs to be specified.") - @Default.Class(DirectPipelineRunner.class) + @Default.Class(InProcessPipelineRunner.class) Class> getRunner(); void setRunner(Class> kls); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java index 0852269988f2..bb3d501f5d11 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.util.MutationDetector; import org.apache.beam.sdk.util.MutationDetectors; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -113,17 +112,16 @@ public CommittedBundle commit(Instant synchronizedProcessingTime) { try { detector.verifyUnmodified(); } catch (IllegalMutationException exn) { - throw UserCodeException.wrap( - new IllegalMutationException( - String.format( - "PTransform %s mutated value %s after it was output (new value was %s)." - + " Values must not be mutated in any way after being output.", - underlying.getPCollection().getProducingTransformInternal().getFullName(), - exn.getSavedValue(), - exn.getNewValue()), + throw new IllegalMutationException( + String.format( + "PTransform %s mutated value %s after it was output (new value was %s)." + + " Values must not be mutated in any way after being output.", + underlying.getPCollection().getProducingTransformInternal().getFullName(), exn.getSavedValue(), - exn.getNewValue(), - exn)); + exn.getNewValue()), + exn.getSavedValue(), + exn.getNewValue(), + exn); } } return underlying.commit(synchronizedProcessingTime); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index 62c690959249..e2d434237e44 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.RestoreSystemProperties; @@ -60,7 +61,7 @@ @RunWith(JUnit4.class) public class PipelineOptionsFactoryTest { private static final Class> DEFAULT_RUNNER_CLASS = - DirectPipelineRunner.class; + InProcessPipelineRunner.class; @Rule public ExpectedException expectedException = ExpectedException.none(); @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java index dfda5284e1cd..459272e67e16 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java @@ -24,7 +24,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import org.apache.beam.sdk.runners.DirectPipelineRunner; +import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -87,7 +87,7 @@ public void testDynamicAs() { @Test public void testDefaultRunnerIsSet() { - assertEquals(DirectPipelineRunner.class, PipelineOptionsFactory.create().getRunner()); + assertEquals(InProcessPipelineRunner.class, PipelineOptionsFactory.create().getRunner()); } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java index 7690d2ba88dc..a778a0de3497 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java @@ -128,45 +128,46 @@ public void testCompositeCapture() throws Exception { final EnumSet left = EnumSet.noneOf(TransformsSeen.class); - p.traverseTopologically(new Pipeline.PipelineVisitor() { - @Override - public void enterCompositeTransform(TransformTreeNode node) { - PTransform transform = node.getTransform(); - if (transform instanceof Sample.SampleAny) { - assertTrue(visited.add(TransformsSeen.SAMPLE_ANY)); - assertNotNull(node.getEnclosingNode()); - assertTrue(node.isCompositeNode()); - } else if (transform instanceof Write.Bound) { - assertTrue(visited.add(TransformsSeen.WRITE)); - assertNotNull(node.getEnclosingNode()); - assertTrue(node.isCompositeNode()); - } - assertThat(transform, not(instanceOf(Read.Bounded.class))); - } - - @Override - public void leaveCompositeTransform(TransformTreeNode node) { - PTransform transform = node.getTransform(); - if (transform instanceof Sample.SampleAny) { - assertTrue(left.add(TransformsSeen.SAMPLE_ANY)); - } - } - - @Override - public void visitTransform(TransformTreeNode node) { - PTransform transform = node.getTransform(); - // Pick is a composite, should not be visited here. - assertThat(transform, not(instanceOf(Sample.SampleAny.class))); - assertThat(transform, not(instanceOf(Write.Bound.class))); - if (transform instanceof Read.Bounded) { - assertTrue(visited.add(TransformsSeen.READ)); - } - } - - @Override - public void visitValue(PValue value, TransformTreeNode producer) { - } - }); + p.traverseTopologically( + new Pipeline.PipelineVisitor() { + @Override + public void enterCompositeTransform(TransformTreeNode node) { + PTransform transform = node.getTransform(); + if (transform instanceof Sample.SampleAny) { + assertTrue(visited.add(TransformsSeen.SAMPLE_ANY)); + assertNotNull(node.getEnclosingNode()); + assertTrue(node.isCompositeNode()); + } else if (transform instanceof Write.Bound) { + assertTrue(visited.add(TransformsSeen.WRITE)); + assertNotNull(node.getEnclosingNode()); + assertTrue(node.isCompositeNode()); + } + assertThat(transform, not(instanceOf(Read.Bounded.class))); + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + PTransform transform = node.getTransform(); + if (transform instanceof Sample.SampleAny) { + assertTrue(left.add(TransformsSeen.SAMPLE_ANY)); + } + } + + @Override + public void visitTransform(TransformTreeNode node) { + PTransform transform = node.getTransform(); + // Pick is a composite, should not be visited here. + assertThat(transform, not(instanceOf(Sample.SampleAny.class))); + assertThat(transform, not(instanceOf(Write.Bound.class))); + if (transform instanceof Read.Bounded + && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) { + assertTrue(visited.add(TransformsSeen.READ)); + } + } + + @Override + public void visitValue(PValue value, TransformTreeNode producer) {} + }); assertTrue(visited.equals(EnumSet.allOf(TransformsSeen.class))); assertTrue(left.equals(EnumSet.of(TransformsSeen.SAMPLE_ANY))); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java index 7720589b49a9..b3a7d15ae33a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java @@ -55,7 +55,7 @@ public class EncodabilityEnforcementFactoryTest { public void encodeFailsThrows() { TestPipeline p = TestPipeline.create(); PCollection unencodable = - p.apply(Create.of(new Record()).withCoder(new RecordNoEncodeCoder())); + p.apply(Create.of().withCoder(new RecordNoEncodeCoder())); AppliedPTransform consumer = unencodable.apply(Count.globally()).getProducingTransformInternal(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java index 386eacc377d7..06e71b8c1e1d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.runners.inprocess; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.coders.ByteArrayCoder; @@ -31,7 +30,6 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.IllegalMutationException; -import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -163,10 +161,9 @@ public void mutationAfterAddRootBundleThrows() { root.add(WindowedValue.valueInGlobalWindow(array)); array[1] = 2; - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage("Values must not be mutated in any way after being output"); - CommittedBundle committed = root.commit(Instant.now()); + root.commit(Instant.now()); } @Test @@ -184,10 +181,9 @@ public void mutationAfterAddKeyedBundleThrows() { keyed.add(windowedArray); array[0] = Byte.MAX_VALUE; - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage("Values must not be mutated in any way after being output"); - CommittedBundle committed = keyed.commit(Instant.now()); + keyed.commit(Instant.now()); } @Test @@ -205,10 +201,9 @@ public void mutationAfterAddCreateBundleThrows() { intermediate.add(windowedArray); array[2] = -3; - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage("Values must not be mutated in any way after being output"); - CommittedBundle committed = intermediate.commit(Instant.now()); + intermediate.commit(Instant.now()); } private static class IdentityDoFn extends DoFn { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 44154e62e4f0..83e0f2cbf002 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -26,7 +26,6 @@ import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.isA; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.hamcrest.core.AnyOf.anyOf; @@ -36,7 +35,6 @@ import static org.junit.Assert.assertThat; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.ListCoder; @@ -1119,7 +1117,7 @@ public void testSideOutputUnknownCoder() throws Exception { input.apply(ParDo.of(new SideOutputDummyFn(sideOutputTag)) .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); - thrown.expect(PipelineExecutionException.class); + thrown.expect(IllegalStateException.class); thrown.expectMessage("Unable to return a default Coder"); pipeline.run(); } @@ -1422,8 +1420,7 @@ public void testMutatingOutputThenOutputDoFnError() throws Exception { } })); - thrown.expect(PipelineExecutionException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage("output"); thrown.expectMessage("must not be mutated"); pipeline.run(); @@ -1472,8 +1469,7 @@ public void testMutatingOutputCoderDoFnError() throws Exception { } })); - thrown.expect(PipelineExecutionException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage("output"); thrown.expectMessage("must not be mutated"); pipeline.run(); @@ -1499,7 +1495,7 @@ public void testMutatingInputDoFnError() throws Exception { })); thrown.expect(IllegalMutationException.class); - thrown.expectMessage("input"); + thrown.expectMessage("Input"); thrown.expectMessage("must not be mutated"); pipeline.run(); } @@ -1523,7 +1519,7 @@ public void testMutatingInputCoderDoFnError() throws Exception { })); thrown.expect(IllegalMutationException.class); - thrown.expectMessage("input"); + thrown.expectMessage("Input"); thrown.expectMessage("must not be mutated"); pipeline.run(); } diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java index a0d1a6304254..1ffb1476eeb0 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.transforms; -import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; @@ -65,7 +64,7 @@ public void withLambdaAndNoTypeDescriptorShouldThrow() { values.apply("ApplyKeysWithWithKeys", WithKeys.of((String s) -> Integer.valueOf(s))); - thrown.expect(PipelineExecutionException.class); + thrown.expect(IllegalStateException.class); thrown.expectMessage("Unable to return a default Coder for ApplyKeysWithWithKeys"); thrown.expectMessage("Cannot provide a coder for type variable K"); thrown.expectMessage("the actual type is unknown due to erasure.");