From c5c707c408f5b88d820c94c92b5db7ec7f93fdbd Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 2 May 2016 13:25:40 -0700 Subject: [PATCH 1/3] beam-wide: blacklist Throwables.propagate and remove uses This is a forward-port of https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/232 --- .../apache/beam/examples/common/DataflowExampleUtils.java | 3 +-- .../runners/direct/ImmutabilityCheckingBundleFactory.java | 3 +-- .../apache/beam/runners/direct/InProcessPipelineRunner.java | 6 ++++-- .../beam/runners/direct/InProcessSideInputContainer.java | 3 +-- .../org/apache/beam/runners/direct/TransformExecutor.java | 4 +--- .../apache/beam/runners/dataflow/DataflowPipelineJob.java | 3 +-- .../java/build-tools/src/main/resources/beam/checkstyle.xml | 6 ++++++ .../core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java | 3 +-- .../org/apache/beam/sdk/options/PipelineOptionsFactory.java | 4 ++-- .../java/org/apache/beam/sdk/transforms/DoFnReflector.java | 3 +-- .../beam/sdk/transforms/IntraBundleParallelization.java | 4 ++-- .../org/apache/beam/sdk/util/BigQueryTableInserter.java | 3 +-- .../test/java/org/apache/beam/sdk/util/ReduceFnTester.java | 3 +-- .../test/java/org/apache/beam/sdk/util/TriggerTester.java | 4 +--- .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 3 +-- 15 files changed, 25 insertions(+), 30 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java index 7ac71d300b99..fb4f3bfb413c 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java @@ -51,7 +51,6 @@ import com.google.api.services.pubsub.model.Subscription; import com.google.api.services.pubsub.model.Topic; import com.google.common.base.Strings; -import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; @@ -116,7 +115,7 @@ public void setup() throws IOException { Thread.currentThread().interrupt(); // Ignore InterruptedException } - Throwables.propagate(lastException); + throw new RuntimeException(lastException); } /** diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java index 2103ad3b2b91..3b3821199beb 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java @@ -31,7 +31,6 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; -import com.google.api.client.util.Throwables; import com.google.common.collect.HashMultimap; import com.google.common.collect.SetMultimap; @@ -100,7 +99,7 @@ public UncommittedBundle add(WindowedValue element) { mutationDetectors.put( element, MutationDetectors.forValueWithCoder(element.getValue(), coder)); } catch (CoderException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } underlying.add(element); return this; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java index bb8c0de68613..19e9f47de568 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java @@ -50,7 +50,6 @@ import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -256,7 +255,10 @@ public InProcessPipelineResult run(Pipeline pipeline) { } catch (UserCodeException userException) { throw new PipelineExecutionException(userException.getCause()); } catch (Throwable t) { - Throwables.propagate(t); + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } + throw new RuntimeException(t); } } return result; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java index f4980ef1546c..b01cd484c2e2 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.values.PCollectionView; import com.google.common.base.MoreObjects; -import com.google.common.base.Throwables; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -172,7 +171,7 @@ private void updatePCollectionViewWindowValues( future.set(Collections.>emptyList()); } } catch (ExecutionException e) { - Throwables.propagate(e.getCause()); + throw new RuntimeException(e.getCause()); } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java index 8346e89f5af4..661c3f9e8f72 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -23,8 +23,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.util.WindowedValue; -import com.google.common.base.Throwables; - import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.Callable; @@ -119,7 +117,7 @@ public InProcessTransformResult call() { return result; } catch (Throwable t) { onComplete.handleThrowable(inputBundle, t); - throw Throwables.propagate(t); + throw new RuntimeException(t); } finally { transformEvaluationState.complete(this); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 69565ac6573e..0f42148a0a4c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -41,7 +41,6 @@ import com.google.api.services.dataflow.model.JobMetrics; import com.google.api.services.dataflow.model.MetricUpdate; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Throwables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -356,7 +355,7 @@ private boolean nextBackOff(Sleeper sleeper, BackOff backoff) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } - throw Throwables.propagate(e); + throw new RuntimeException(e); } } diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml index 31717ffda967..2a4f8327fec2 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml @@ -59,6 +59,12 @@ page at http://checkstyle.sourceforge.net/config.html --> + + + + + + diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 66d1d4335505..46464614221a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -51,7 +51,6 @@ import com.google.api.services.pubsub.model.ReceivedMessage; import com.google.api.services.pubsub.model.Subscription; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import org.joda.time.Duration; @@ -814,7 +813,7 @@ public void processElement(ProcessContext c) throws IOException { } } if (finallyBlockException != null) { - Throwables.propagate(finallyBlockException); + throw new RuntimeException(finallyBlockException); } for (PubsubMessage message : messages) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index 5fc7312d8ef3..5f2dd1129458 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.StringUtils; import org.apache.beam.sdk.util.common.ReflectHelpers; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Joiner; @@ -32,7 +33,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Strings; -import com.google.common.base.Throwables; import com.google.common.collect.Collections2; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableListMultimap; @@ -991,7 +991,7 @@ private static List validateClass(Class void invoke(Method m, throw UserCodeException.wrap(e.getCause()); } catch (IllegalAccessException | IllegalArgumentException e) { // Exception in our code. - throw Throwables.propagate(e); + throw new RuntimeException(e); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java index 4b3afb426ea0..62c09c2158ac 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java @@ -221,7 +221,7 @@ public void processElement(final ProcessContext c) throws Exception { } if (failure.get() != null) { - throw Throwables.propagate(failure.get()); + throw new RuntimeException(failure.get()); } executor.submit(new Runnable() { @@ -246,7 +246,7 @@ public void finishBundle(Context c) throws Exception { // processElement calls have finished. workTickets.acquire(maxParallelism); if (failure.get() != null) { - throw Throwables.propagate(failure.get()); + throw new RuntimeException(failure.get()); } doFn.finishBundle(c); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java index 86a0b5befca8..0493f1c7fa49 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java @@ -37,7 +37,6 @@ import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; @@ -275,7 +274,7 @@ public List call() throws IOException { Thread.currentThread().interrupt(); throw new IOException("Interrupted while inserting " + rowsToPublish); } catch (ExecutionException e) { - Throwables.propagate(e.getCause()); + throw new RuntimeException(e.getCause()); } if (!allErrors.isEmpty() && !backoff.atMaxAttempts()) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java index f0d2a44a1ae9..f296d653074b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java @@ -58,7 +58,6 @@ import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -408,7 +407,7 @@ public WindowedValue apply(TimestampedValue input) { windowFn, value, timestamp, Arrays.asList(GlobalWindow.INSTANCE))); return WindowedValue.of(value, timestamp, windows, PaneInfo.NO_FIRING); } catch (Exception e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } })); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java index 10d3d3521ca5..0889b4f66ca7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; - import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -45,7 +44,6 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -257,7 +255,7 @@ public final void injectElements(Collection> values) th windowedValues.add(WindowedValue.of(value, timestamp, assignedWindows, PaneInfo.NO_FIRING)); } catch (Exception e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index e6053116fbe5..4f353df865ff 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -44,7 +44,6 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Optional; -import com.google.common.base.Throwables; import com.google.common.collect.ComparisonChain; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -634,7 +633,7 @@ public UnboundedKafkaReader createReader(PipelineOptions options, return new UnboundedKafkaReader( generateInitialSplits(1, options).get(0), checkpointMark); } catch (Exception e) { - Throwables.propagate(e); + throw new RuntimeException(e); } } return new UnboundedKafkaReader(this, checkpointMark); From 046feca39256399c891e5ab386a67f1a8cd8deee Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 2 May 2016 13:39:29 -0700 Subject: [PATCH 2/3] fixup! beam-wide: blacklist Throwables.propagate and remove uses --- runners/direct-java/pom.xml | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index 12ba329db70c..15a4e9cdd100 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -278,20 +278,6 @@ ${project.version} - - com.google.http-client - google-http-client - ${google-clients.version} - - - - com.google.guava - guava-jdk5 - - - - com.google.http-client google-http-client-protobuf From 9e36094fb045085222e19b63718d8db13d602e19 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 2 May 2016 14:33:02 -0700 Subject: [PATCH 3/3] fixup! beam-wide: blacklist Throwables.propagate and remove uses --- .../java/org/apache/beam/runners/direct/TransformExecutor.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java index 661c3f9e8f72..9e15c2aab830 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -117,6 +117,9 @@ public InProcessTransformResult call() { return result; } catch (Throwable t) { onComplete.handleThrowable(inputBundle, t); + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } throw new RuntimeException(t); } finally { transformEvaluationState.complete(this);