diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java index 91285856993f..8d85ab36bc6d 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java @@ -54,7 +54,6 @@ import com.google.cloud.dataflow.sdk.coders.SerializableCoder; import com.google.cloud.dataflow.sdk.io.Sink.WriteOperation; import com.google.cloud.dataflow.sdk.io.Sink.Writer; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions; import com.google.cloud.dataflow.sdk.options.GcpOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff; @@ -307,18 +306,7 @@ public List splitIntoBundles(long desiredBundleSizeBytes, PipelineOption numSplits = Math.round(((double) getEstimatedSizeBytes(options)) / desiredBundleSizeBytes); } catch (Exception e) { // Fallback in case estimated size is unavailable. TODO: fix this, it's horrible. - - // 1. Try Dataflow's numWorkers, which will be 0 for other workers. - DataflowPipelineWorkerPoolOptions poolOptions = - options.as(DataflowPipelineWorkerPoolOptions.class); - if (poolOptions.getNumWorkers() > 0) { - LOG.warn("Estimated size of unavailable, using the number of workers {}", - poolOptions.getNumWorkers(), e); - numSplits = poolOptions.getNumWorkers(); - } else { - // 2. Default to 12 in the unknown case. - numSplits = 12; - } + numSplits = 12; } // If the desiredBundleSize or number of workers results in 1 split, simply return diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java index b7f2afef5f03..deed9abd504c 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java @@ -33,8 +33,7 @@ import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.options.PubsubOptions; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -399,10 +398,9 @@ public String asPath() { * the stream. * *

When running with a {@link PipelineRunner} that only supports bounded - * {@link PCollection PCollections} (such as {@link DirectPipelineRunner} or - * {@link DataflowPipelineRunner} without {@code --streaming}), only a bounded portion of the - * input Pub/Sub stream can be processed. As such, either {@link Bound#maxNumRecords(int)} or - * {@link Bound#maxReadTime(Duration)} must be set. + * {@link PCollection PCollections} (such as {@link DirectPipelineRunner}), + * only a bounded portion of the input Pub/Sub stream can be processed. As such, either + * {@link Bound#maxNumRecords(int)} or {@link Bound#maxReadTime(Duration)} must be set. */ public static class Read { /** @@ -728,7 +726,7 @@ private class PubsubReader extends DoFn { @Override public void processElement(ProcessContext c) throws IOException { Pubsub pubsubClient = - Transport.newPubsubClient(c.getPipelineOptions().as(DataflowPipelineOptions.class)) + Transport.newPubsubClient(c.getPipelineOptions().as(PubsubOptions.class)) .build(); String subscription; @@ -1004,7 +1002,7 @@ private class PubsubWriter extends DoFn { public void startBundle(Context c) { this.output = new ArrayList<>(); this.pubsubClient = - Transport.newPubsubClient(c.getPipelineOptions().as(DataflowPipelineOptions.class)) + Transport.newPubsubClient(c.getPipelineOptions().as(PubsubOptions.class)) .build(); } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java index 0980d48b69da..ed25926c5b77 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java @@ -25,7 +25,6 @@ import com.google.cloud.dataflow.sdk.coders.VoidCoder; import com.google.cloud.dataflow.sdk.io.Read.Bounded; import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; @@ -113,11 +112,10 @@ *

Permissions

*

When run using the {@link DirectPipelineRunner}, your pipeline can read and write text files * on your local drive and remote text files on Google Cloud Storage that you have access to using - * your {@code gcloud} credentials. When running in the Dataflow service using - * {@link DataflowPipelineRunner}, the pipeline can only read and write files from GCS. For more - * information about permissions, see the Cloud Dataflow documentation on - * Security and - * Permissions. + * your {@code gcloud} credentials. When running in the Dataflow service, the pipeline can only + * read and write files from GCS. For more information about permissions, see the Cloud Dataflow + * documentation on Security + * and Permissions. */ public class TextIO { /** The default coder, which returns each line of the input file as a string. */ diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java index d4f63855a6cd..6231bd4010df 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java @@ -20,11 +20,11 @@ import com.google.api.services.dataflow.Dataflow; import com.google.cloud.dataflow.sdk.annotations.Experimental; import com.google.cloud.dataflow.sdk.util.DataflowPathValidator; +import com.google.cloud.dataflow.sdk.util.DataflowTransport; import com.google.cloud.dataflow.sdk.util.GcsStager; import com.google.cloud.dataflow.sdk.util.InstanceBuilder; import com.google.cloud.dataflow.sdk.util.PathValidator; import com.google.cloud.dataflow.sdk.util.Stager; -import com.google.cloud.dataflow.sdk.util.Transport; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -159,18 +159,11 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions { public static class DataflowClientFactory implements DefaultValueFactory { @Override public Dataflow create(PipelineOptions options) { - return Transport.newDataflowClient(options.as(DataflowPipelineOptions.class)).build(); + return DataflowTransport.newDataflowClient( + options.as(DataflowPipelineOptions.class)).build(); } } - /** - * Root URL for use with the Pubsub API. - */ - @Description("Root URL for use with the Pubsub API") - @Default.String("https://pubsub.googleapis.com") - String getPubsubRootUrl(); - void setPubsubRootUrl(String value); - /** * Whether to update the currently running pipeline with the same name as this one. * diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java index 23ca0e7c409c..dbfafd1716a9 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java @@ -33,7 +33,7 @@ public interface DataflowPipelineOptions extends PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions, DataflowPipelineWorkerPoolOptions, BigQueryOptions, GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions, - DataflowProfilingOptions { + DataflowProfilingOptions, PubsubOptions { @Description("Project id. Required when running a Dataflow in the cloud. " + "See https://cloud.google.com/storage/docs/projects for further details.") diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/PubsubOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/PubsubOptions.java new file mode 100644 index 000000000000..deb19e909f5c --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/PubsubOptions.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.options; + +/** + * Properties that can be set when using Pubsub with the Beam SDK. + */ +@Description("Options that are used to configure BigQuery. See " + + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.") +public interface PubsubOptions extends ApplicationNameOptions, GcpOptions, + PipelineOptions, StreamingOptions { + + /** + * Root URL for use with the Pubsub API. + */ + @Description("Root URL for use with the Pubsub API") + @Default.String("https://pubsub.googleapis.com") + @Hidden + String getPubsubRootUrl(); + void setPubsubRootUrl(String value); +} diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index 5f43cc3eb91f..50ca36fe08c9 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -99,6 +99,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.Window; import com.google.cloud.dataflow.sdk.util.CoderUtils; import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo; +import com.google.cloud.dataflow.sdk.util.DataflowTransport; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; import com.google.cloud.dataflow.sdk.util.InstanceBuilder; import com.google.cloud.dataflow.sdk.util.MonitoringUtil; @@ -107,7 +108,6 @@ import com.google.cloud.dataflow.sdk.util.PropertyNames; import com.google.cloud.dataflow.sdk.util.Reshuffle; import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal; -import com.google.cloud.dataflow.sdk.util.Transport; import com.google.cloud.dataflow.sdk.util.ValueWithRecordId; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder; @@ -444,7 +444,7 @@ private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniqu throw new RuntimeException("Should not specify the debuggee"); } - Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build(); + Clouddebugger debuggerClient = DataflowTransport.newClouddebuggerClient(options).build(); Debuggee debuggee = registerDebuggee(debuggerClient, uniquifier); options.setDebuggee(debuggee); @@ -600,7 +600,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { // regularly and need not be retried automatically. DataflowPipelineJob dataflowPipelineJob = new DataflowPipelineJob(options.getProject(), jobResult.getId(), - Transport.newRawDataflowClient(options).build(), aggregatorTransforms); + DataflowTransport.newRawDataflowClient(options).build(), aggregatorTransforms); // If the service returned client request id, the SDK needs to compare it // with the original id generated in the request, if they are not the same diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java new file mode 100644 index 000000000000..18e66549df0a --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.util; + +import static com.google.cloud.dataflow.sdk.util.Transport.getJsonFactory; +import static com.google.cloud.dataflow.sdk.util.Transport.getTransport; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.services.clouddebugger.v2.Clouddebugger; +import com.google.api.services.dataflow.Dataflow; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; +import com.google.common.collect.ImmutableList; + +import java.net.MalformedURLException; +import java.net.URL; + +/** + * Helpers for cloud communication. + */ +public class DataflowTransport { + + + private static class ApiComponents { + public String rootUrl; + public String servicePath; + + public ApiComponents(String root, String path) { + this.rootUrl = root; + this.servicePath = path; + } + } + + private static ApiComponents apiComponentsFromUrl(String urlString) { + try { + URL url = new URL(urlString); + String rootUrl = url.getProtocol() + "://" + url.getHost() + + (url.getPort() > 0 ? ":" + url.getPort() : ""); + return new ApiComponents(rootUrl, url.getPath()); + } catch (MalformedURLException e) { + throw new RuntimeException("Invalid URL: " + urlString); + } + } + + /** + * Returns a Google Cloud Dataflow client builder. + */ + public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) { + String servicePath = options.getDataflowEndpoint(); + ApiComponents components; + if (servicePath.contains("://")) { + components = apiComponentsFromUrl(servicePath); + } else { + components = new ApiComponents(options.getApiRootUrl(), servicePath); + } + + return new Dataflow.Builder(getTransport(), + getJsonFactory(), + chainHttpRequestInitializer( + options.getGcpCredential(), + // Do not log 404. It clutters the output and is possibly even required by the caller. + new RetryHttpRequestInitializer(ImmutableList.of(404)))) + .setApplicationName(options.getAppName()) + .setRootUrl(components.rootUrl) + .setServicePath(components.servicePath) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) { + return new Clouddebugger.Builder(getTransport(), + getJsonFactory(), + chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer())) + .setApplicationName(options.getAppName()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + /** + * Returns a Dataflow client that does not automatically retry failed + * requests. + */ + public static Dataflow.Builder + newRawDataflowClient(DataflowPipelineOptions options) { + return newDataflowClient(options) + .setHttpRequestInitializer(options.getGcpCredential()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + private static HttpRequestInitializer chainHttpRequestInitializer( + Credential credential, HttpRequestInitializer httpRequestInitializer) { + if (credential == null) { + return httpRequestInitializer; + } else { + return new ChainingHttpRequestInitializer(credential, httpRequestInitializer); + } + } +} diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java index 27f61fab1774..d5cdb2439a2b 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java @@ -24,14 +24,11 @@ import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.clouddebugger.v2.Clouddebugger; -import com.google.api.services.dataflow.Dataflow; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.storage.Storage; import com.google.cloud.dataflow.sdk.options.BigQueryOptions; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.GcsOptions; +import com.google.cloud.dataflow.sdk.options.PubsubOptions; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; import com.google.common.collect.ImmutableList; @@ -92,10 +89,7 @@ private static ApiComponents apiComponentsFromUrl(String urlString) { } /** - * Returns a BigQuery client builder. - * - *

Note: this client's endpoint is not modified by the - * {@link DataflowPipelineDebugOptions#getApiRootUrl()} option. + * Returns a BigQuery client builder using the specified {@link BigQueryOptions}. */ public static Bigquery.Builder newBigQueryClient(BigQueryOptions options) { @@ -109,13 +103,10 @@ private static ApiComponents apiComponentsFromUrl(String urlString) { } /** - * Returns a Pubsub client builder. - * - *

Note: this client's endpoint is not modified by the - * {@link DataflowPipelineDebugOptions#getApiRootUrl()} option. + * Returns a Pubsub client builder using the specified {@link PubsubOptions}. */ public static Pubsub.Builder - newPubsubClient(DataflowPipelineOptions options) { + newPubsubClient(PubsubOptions options) { return new Pubsub.Builder(getTransport(), getJsonFactory(), chainHttpRequestInitializer( options.getGcpCredential(), @@ -127,53 +118,7 @@ private static ApiComponents apiComponentsFromUrl(String urlString) { } /** - * Returns a Google Cloud Dataflow client builder. - */ - public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) { - String servicePath = options.getDataflowEndpoint(); - ApiComponents components; - if (servicePath.contains("://")) { - components = apiComponentsFromUrl(servicePath); - } else { - components = new ApiComponents(options.getApiRootUrl(), servicePath); - } - - return new Dataflow.Builder(getTransport(), - getJsonFactory(), - chainHttpRequestInitializer( - options.getGcpCredential(), - // Do not log 404. It clutters the output and is possibly even required by the caller. - new RetryHttpRequestInitializer(ImmutableList.of(404)))) - .setApplicationName(options.getAppName()) - .setRootUrl(components.rootUrl) - .setServicePath(components.servicePath) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) { - return new Clouddebugger.Builder(getTransport(), - getJsonFactory(), - chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer())) - .setApplicationName(options.getAppName()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - /** - * Returns a Dataflow client that does not automatically retry failed - * requests. - */ - public static Dataflow.Builder - newRawDataflowClient(DataflowPipelineOptions options) { - return newDataflowClient(options) - .setHttpRequestInitializer(options.getGcpCredential()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - /** - * Returns a Cloud Storage client builder. - * - *

Note: this client's endpoint is not modified by the - * {@link DataflowPipelineDebugOptions#getApiRootUrl()} option. + * Returns a Cloud Storage client builder using the specified {@link GcsOptions}. */ public static Storage.Builder newStorageClient(GcsOptions options) { diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java new file mode 100644 index 000000000000..1bd8a85437d1 --- /dev/null +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.io; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.util.GcsUtil; +import com.google.cloud.dataflow.sdk.util.TestCredential; +import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; +import com.google.common.collect.ImmutableList; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.List; + +/** + * {@link DataflowPipelineRunner} specific tests for TextIO Read and Write transforms. + */ +@RunWith(JUnit4.class) +public class DataflowTextIOTest { + + private TestDataflowPipelineOptions buildTestPipelineOptions() { + TestDataflowPipelineOptions options = + PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); + options.setGcpCredential(new TestCredential()); + return options; + } + + private GcsUtil buildMockGcsUtil() throws IOException { + GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class); + + // Any request to open gets a new bogus channel + Mockito + .when(mockGcsUtil.open(Mockito.any(GcsPath.class))) + .then(new Answer() { + @Override + public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { + return FileChannel.open( + Files.createTempFile("channel-", ".tmp"), + StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); + } + }); + + // Any request for expansion returns a list containing the original GcsPath + // This is required to pass validation that occurs in TextIO during apply() + Mockito + .when(mockGcsUtil.expand(Mockito.any(GcsPath.class))) + .then(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return ImmutableList.of((GcsPath) invocation.getArguments()[0]); + } + }); + + return mockGcsUtil; + } + + /** + * This tests a few corner cases that should not crash. + */ + @Test + public void testGoodWildcards() throws Exception { + TestDataflowPipelineOptions options = buildTestPipelineOptions(); + options.setGcsUtil(buildMockGcsUtil()); + + Pipeline pipeline = Pipeline.create(options); + + applyRead(pipeline, "gs://bucket/foo"); + applyRead(pipeline, "gs://bucket/foo/"); + applyRead(pipeline, "gs://bucket/foo/*"); + applyRead(pipeline, "gs://bucket/foo/?"); + applyRead(pipeline, "gs://bucket/foo/[0-9]"); + applyRead(pipeline, "gs://bucket/foo/*baz*"); + applyRead(pipeline, "gs://bucket/foo/*baz?"); + applyRead(pipeline, "gs://bucket/foo/[0-9]baz?"); + applyRead(pipeline, "gs://bucket/foo/baz/*"); + applyRead(pipeline, "gs://bucket/foo/baz/*wonka*"); + applyRead(pipeline, "gs://bucket/foo/*baz/wonka*"); + applyRead(pipeline, "gs://bucket/foo*/baz"); + applyRead(pipeline, "gs://bucket/foo?/baz"); + applyRead(pipeline, "gs://bucket/foo[0-9]/baz"); + + // Check that running doesn't fail. + pipeline.run(); + } + + private void applyRead(Pipeline pipeline, String path) { + pipeline.apply("Read(" + path + ")", TextIO.Read.from(path)); + } +} + diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java index 41967409d0b8..dcdb229fd4f4 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java @@ -51,7 +51,7 @@ import com.google.api.services.datastore.client.QuerySplitter; import com.google.cloud.dataflow.sdk.io.DatastoreIO.DatastoreReader; import com.google.cloud.dataflow.sdk.io.DatastoreIO.DatastoreWriter; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.GcpOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; @@ -69,14 +69,11 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; -import javax.annotation.Nullable; - /** * Tests for {@link DatastoreIO}. */ @@ -112,12 +109,9 @@ public void setUp() { /** * Helper function to create a test {@code DataflowPipelineOptions}. */ - static final DataflowPipelineOptions testPipelineOptions(@Nullable Integer numWorkers) { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + static final GcpOptions testPipelineOptions() { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); options.setGcpCredential(new TestCredential()); - if (numWorkers != null) { - options.setNumWorkers(numWorkers); - } return options; } @@ -217,13 +211,13 @@ public void testSinkValidationFailsWithNoDataset() throws Exception { thrown.expect(NullPointerException.class); thrown.expectMessage("Dataset"); - sink.validate(testPipelineOptions(null)); + sink.validate(testPipelineOptions()); } @Test public void testSinkValidationSucceedsWithDataset() throws Exception { DatastoreIO.Sink sink = DatastoreIO.sink().withDataset(DATASET); - sink.validate(testPipelineOptions(null)); + sink.validate(testPipelineOptions()); } @Test @@ -254,7 +248,7 @@ public void testQuerySplitBasic() throws Exception { .withMockSplitter(splitter) .withMockEstimateSizeBytes(8 * 1024L); - List bundles = io.splitIntoBundles(1024, testPipelineOptions(null)); + List bundles = io.splitIntoBundles(1024, testPipelineOptions()); assertEquals(8, bundles.size()); for (int i = 0; i < 8; ++i) { DatastoreIO.Source bundle = bundles.get(i); @@ -274,7 +268,7 @@ public void testSourceWithNamespace() throws Exception { .withMockSplitter(splitter) .withMockEstimateSizeBytes(8 * 1024L); - io.splitIntoBundles(1024, testPipelineOptions(null)); + io.splitIntoBundles(1024, testPipelineOptions()); PartitionId partition = PartitionId.newBuilder().setNamespace(NAMESPACE).build(); verify(splitter).getSplits(eq(QUERY), eq(partition), eq(8), any(Datastore.class)); @@ -300,7 +294,7 @@ public void testQuerySplitWithZeroSize() throws Exception { .withMockSplitter(splitter) .withMockEstimateSizeBytes(0L); - List bundles = io.splitIntoBundles(1024, testPipelineOptions(null)); + List bundles = io.splitIntoBundles(1024, testPipelineOptions()); assertEquals(1, bundles.size()); verify(splitter, never()) .getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class)); @@ -328,7 +322,7 @@ public void testQueryDoesNotSplitWithLimitSet() throws Exception { initialSource .withQuery(query) .withMockSplitter(splitter) - .splitIntoBundles(1024, testPipelineOptions(null)); + .splitIntoBundles(1024, testPipelineOptions()); assertEquals(1, bundles.size()); assertEquals(query, bundles.get(0).getQuery()); @@ -356,7 +350,7 @@ public void testQuerySplitterThrows() throws Exception { .withQuery(query) .withMockSplitter(splitter) .withMockEstimateSizeBytes(10240L) - .splitIntoBundles(1024, testPipelineOptions(null)); + .splitIntoBundles(1024, testPipelineOptions()); assertEquals(1, bundles.size()); assertEquals(query, bundles.get(0).getQuery()); @@ -371,44 +365,6 @@ public void testQuerySplitSizeUnavailable() throws Exception { KindExpression mykind = KindExpression.newBuilder().setName("mykind").build(); Query query = Query.newBuilder().addKind(mykind).build(); - List mockSplits = new ArrayList<>(); - for (int i = 0; i < 2; i++) { - mockSplits.add( - Query.newBuilder() - .addKind(mykind) - .setFilter( - DatastoreHelper.makeFilter("foo", PropertyFilter.Operator.EQUAL, - Value.newBuilder().setIntegerValue(i).build())) - .build()); - } - - QuerySplitter splitter = mock(QuerySplitter.class); - when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(2), any(Datastore.class))) - .thenReturn(mockSplits); - - DatastoreIO.Source io = initialSource - .withQuery(query) - .withMockSplitter(splitter) - .withMockEstimateSizeBytes(8 * 1024L); - - DatastoreIO.Source spiedIo = spy(io); - when(spiedIo.getEstimatedSizeBytes(any(PipelineOptions.class))).thenThrow(new IOException()); - - List bundles = spiedIo.splitIntoBundles(1024, testPipelineOptions(2)); - assertEquals(2, bundles.size()); - for (int i = 0; i < 2; ++i) { - DatastoreIO.Source bundle = bundles.get(i); - Query bundleQuery = bundle.getQuery(); - assertEquals("mykind", bundleQuery.getKind(0).getName()); - assertEquals(i, bundleQuery.getFilter().getPropertyFilter().getValue().getIntegerValue()); - } - } - - @Test - public void testQuerySplitNoWorkers() throws Exception { - KindExpression mykind = KindExpression.newBuilder().setName("mykind").build(); - Query query = Query.newBuilder().addKind(mykind).build(); - List mockSplits = Lists.newArrayList(Query.newBuilder().addKind(mykind).build()); QuerySplitter splitter = mock(QuerySplitter.class); @@ -424,7 +380,7 @@ public void testQuerySplitNoWorkers() throws Exception { when(spiedIo.getEstimatedSizeBytes(any(PipelineOptions.class))) .thenThrow(new NoSuchElementException()); - List bundles = spiedIo.splitIntoBundles(1024, testPipelineOptions(0)); + List bundles = spiedIo.splitIntoBundles(1024, testPipelineOptions()); assertEquals(1, bundles.size()); verify(splitter, never()) .getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class)); diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java index 6d4d6e7efe65..5a9a8c631be4 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java @@ -37,15 +37,11 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.testing.SourceTestUtils; -import com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineOptions; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.util.CoderUtils; -import com.google.cloud.dataflow.sdk.util.GcsUtil; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; -import com.google.cloud.dataflow.sdk.util.TestCredential; -import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PDone; import com.google.common.collect.ImmutableList; @@ -56,9 +52,6 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.io.BufferedReader; import java.io.File; @@ -66,11 +59,8 @@ import java.io.FileReader; import java.io.IOException; import java.io.PrintStream; -import java.nio.channels.FileChannel; -import java.nio.channels.SeekableByteChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -85,42 +75,6 @@ public class TextIOTest { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @Rule public ExpectedException expectedException = ExpectedException.none(); - private GcsUtil buildMockGcsUtil() throws IOException { - GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class); - - // Any request to open gets a new bogus channel - Mockito - .when(mockGcsUtil.open(Mockito.any(GcsPath.class))) - .then(new Answer() { - @Override - public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { - return FileChannel.open( - Files.createTempFile("channel-", ".tmp"), - StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); - } - }); - - // Any request for expansion returns a list containing the original GcsPath - // This is required to pass validation that occurs in TextIO during apply() - Mockito - .when(mockGcsUtil.expand(Mockito.any(GcsPath.class))) - .then(new Answer>() { - @Override - public List answer(InvocationOnMock invocation) throws Throwable { - return ImmutableList.of((GcsPath) invocation.getArguments()[0]); - } - }); - - return mockGcsUtil; - } - - private TestDataflowPipelineOptions buildTestPipelineOptions() { - TestDataflowPipelineOptions options = - PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); - options.setGcpCredential(new TestCredential()); - return options; - } - void runTestRead(T[] expected, Coder coder) throws Exception { File tmpFile = tmpFolder.newFile("file.txt"); String filename = tmpFile.getPath(); @@ -328,39 +282,6 @@ public void testUnsupportedFilePattern() throws IOException { input.apply(TextIO.Write.to(filename)); } - /** - * This tests a few corner cases that should not crash. - */ - @Test - public void testGoodWildcards() throws Exception { - TestDataflowPipelineOptions options = buildTestPipelineOptions(); - options.setGcsUtil(buildMockGcsUtil()); - - Pipeline pipeline = Pipeline.create(options); - - applyRead(pipeline, "gs://bucket/foo"); - applyRead(pipeline, "gs://bucket/foo/"); - applyRead(pipeline, "gs://bucket/foo/*"); - applyRead(pipeline, "gs://bucket/foo/?"); - applyRead(pipeline, "gs://bucket/foo/[0-9]"); - applyRead(pipeline, "gs://bucket/foo/*baz*"); - applyRead(pipeline, "gs://bucket/foo/*baz?"); - applyRead(pipeline, "gs://bucket/foo/[0-9]baz?"); - applyRead(pipeline, "gs://bucket/foo/baz/*"); - applyRead(pipeline, "gs://bucket/foo/baz/*wonka*"); - applyRead(pipeline, "gs://bucket/foo/*baz/wonka*"); - applyRead(pipeline, "gs://bucket/foo*/baz"); - applyRead(pipeline, "gs://bucket/foo?/baz"); - applyRead(pipeline, "gs://bucket/foo[0-9]/baz"); - - // Check that running doesn't fail. - pipeline.run(); - } - - private void applyRead(Pipeline pipeline, String path) { - pipeline.apply("Read(" + path + ")", TextIO.Read.from(path)); - } - /** * Recursive wildcards are not supported. * This tests "**". diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java index 320160814c26..c2f0bb85f90a 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java @@ -22,8 +22,7 @@ import static org.junit.Assert.assertNull; import com.google.api.services.bigquery.Bigquery.Datasets.Delete; -import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Create; -import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get; +import com.google.api.services.storage.Storage; import com.google.cloud.dataflow.sdk.options.GoogleApiDebugOptions.GoogleApiTracer; import com.google.cloud.dataflow.sdk.util.TestCredential; import com.google.cloud.dataflow.sdk.util.Transport; @@ -37,104 +36,104 @@ /** Tests for {@link GoogleApiDebugOptions}. */ @RunWith(JUnit4.class) public class GoogleApiDebugOptionsTest { + private static final String STORAGE_GET_TRACE = + "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}"; + private static final String STORAGE_GET_AND_LIST_TRACE = + "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"," + + "\"Objects.List\":\"ListTraceDestination\"}"; + private static final String STORAGE_TRACE = "--googleApiTrace={\"Storage\":\"TraceDestination\"}"; + @Test public void testWhenTracingMatches() throws Exception { - String[] args = - new String[] {"--googleApiTrace={\"Projects.Jobs.Get\":\"GetTraceDestination\"}"}; - DataflowPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class); + String[] args = new String[] {STORAGE_GET_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); options.setGcpCredential(new TestCredential()); - assertNotNull(options.getGoogleApiTrace()); - Get request = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get request = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("GetTraceDestination", request.get("$trace")); } @Test public void testWhenTracingDoesNotMatch() throws Exception { - String[] args = new String[] {"--googleApiTrace={\"Projects.Jobs.Create\":\"testToken\"}"}; - DataflowPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class); + String[] args = new String[] {STORAGE_GET_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); options.setGcpCredential(new TestCredential()); assertNotNull(options.getGoogleApiTrace()); - Get request = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.List request = + Transport.newStorageClient(options).build().objects().list("testProjectId"); assertNull(request.get("$trace")); } @Test public void testWithMultipleTraces() throws Exception { - String[] args = new String[] { - "--googleApiTrace={\"Projects.Jobs.Create\":\"CreateTraceDestination\"," - + "\"Projects.Jobs.Get\":\"GetTraceDestination\"}"}; - DataflowPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class); + String[] args = new String[] {STORAGE_GET_AND_LIST_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); options.setGcpCredential(new TestCredential()); assertNotNull(options.getGoogleApiTrace()); - Get getRequest = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("GetTraceDestination", getRequest.get("$trace")); - Create createRequest = - options.getDataflowClient().projects().jobs().create("testProjectId", null); - assertEquals("CreateTraceDestination", createRequest.get("$trace")); + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertEquals("ListTraceDestination", listRequest.get("$trace")); } @Test - public void testMatchingAllDataflowCalls() throws Exception { - String[] args = new String[] {"--googleApiTrace={\"Dataflow\":\"TraceDestination\"}"}; - DataflowPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class); + public void testMatchingAllCalls() throws Exception { + String[] args = new String[] {STORAGE_TRACE}; + GcsOptions options = + PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); options.setGcpCredential(new TestCredential()); assertNotNull(options.getGoogleApiTrace()); - Get getRequest = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("TraceDestination", getRequest.get("$trace")); - Create createRequest = - options.getDataflowClient().projects().jobs().create("testProjectId", null); - assertEquals("TraceDestination", createRequest.get("$trace")); + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertEquals("TraceDestination", listRequest.get("$trace")); } @Test public void testMatchingAgainstClient() throws Exception { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); options.setGcpCredential(new TestCredential()); options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor( - Transport.newDataflowClient(options).build(), "TraceDestination")); + Transport.newStorageClient(options).build(), "TraceDestination")); - Get getRequest = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("TraceDestination", getRequest.get("$trace")); - Delete deleteRequest = Transport.newBigQueryClient(options).build().datasets() - .delete("testProjectId", "testDatasetId"); + Delete deleteRequest = Transport.newBigQueryClient(options.as(BigQueryOptions.class)) + .build().datasets().delete("testProjectId", "testDatasetId"); assertNull(deleteRequest.get("$trace")); } @Test public void testMatchingAgainstRequestType() throws Exception { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); options.setGcpCredential(new TestCredential()); options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor( - Transport.newDataflowClient(options).build().projects().jobs() - .get("aProjectId", "aJobId"), "TraceDestination")); + Transport.newStorageClient(options).build().objects() + .get("aProjectId", "aObjectId"), "TraceDestination")); - Get getRequest = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("TraceDestination", getRequest.get("$trace")); - Create createRequest = - options.getDataflowClient().projects().jobs().create("testProjectId", null); - assertNull(createRequest.get("$trace")); + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertNull(listRequest.get("$trace")); } @Test diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java index d9d62c8c7102..d8ba8e3d5c51 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java @@ -29,8 +29,6 @@ import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.PipelineResult; -import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; @@ -829,20 +827,21 @@ public void testSetASingularAttributeUsingAListIsIgnoredWithoutStrictParsing() { @Test public void testSettingRunner() { - String[] args = new String[] {"--runner=BlockingDataflowPipelineRunner"}; + String[] args = new String[] {"--runner=DirectPipelineRunner"}; PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); - assertEquals(BlockingDataflowPipelineRunner.class, options.getRunner()); + assertEquals(DirectPipelineRunner.class, options.getRunner()); } @Test public void testSettingRunnerFullName() { String[] args = - new String[] {String.format("--runner=%s", DataflowPipelineRunner.class.getName())}; + new String[] {String.format("--runner=%s", DirectPipelineRunner.class.getName())}; PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); - assertEquals(opts.getRunner(), DataflowPipelineRunner.class); + assertEquals(opts.getRunner(), DirectPipelineRunner.class); } + @Test public void testSettingUnknownRunner() { String[] args = new String[] {"--runner=UnknownRunner"}; diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java index 6f119da7b0b5..d6a2b63a6a4b 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java @@ -23,10 +23,9 @@ import static org.junit.Assert.assertThat; import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.GcpOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.fasterxml.jackson.databind.ObjectMapper; @@ -50,31 +49,14 @@ public void testCreationUsingDefaults() { public void testCreationOfPipelineOptions() throws Exception { ObjectMapper mapper = new ObjectMapper(); String stringOptions = mapper.writeValueAsString(new String[]{ - "--runner=DataflowPipelineRunner", - "--project=testProject", - "--apiRootUrl=testApiRootUrl", - "--dataflowEndpoint=testDataflowEndpoint", - "--tempLocation=testTempLocation", - "--serviceAccountName=testServiceAccountName", - "--serviceAccountKeyfile=testServiceAccountKeyfile", - "--zone=testZone", - "--numWorkers=1", - "--diskSizeGb=2" + "--runner=DirectPipelineRunner", + "--project=testProject" }); System.getProperties().put("dataflowOptions", stringOptions); - DataflowPipelineOptions options = - TestPipeline.testingPipelineOptions().as(DataflowPipelineOptions.class); - assertEquals(DataflowPipelineRunner.class, options.getRunner()); - assertThat(options.getJobName(), startsWith("testpipelinetest0testcreationofpipelineoptions-")); - assertEquals("testProject", options.as(GcpOptions.class).getProject()); - assertEquals("testApiRootUrl", options.getApiRootUrl()); - assertEquals("testDataflowEndpoint", options.getDataflowEndpoint()); - assertEquals("testTempLocation", options.getTempLocation()); - assertEquals("testServiceAccountName", options.getServiceAccountName()); - assertEquals( - "testServiceAccountKeyfile", options.as(GcpOptions.class).getServiceAccountKeyfile()); - assertEquals("testZone", options.getZone()); - assertEquals(2, options.getDiskSizeGb()); + GcpOptions options = + TestPipeline.testingPipelineOptions().as(GcpOptions.class); + assertEquals(DirectPipelineRunner.class, options.getRunner()); + assertEquals(options.getProject(), "testProject"); } @Test diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java new file mode 100644 index 000000000000..b0b011d142a9 --- /dev/null +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.transforms; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.util.NoopPathValidator; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PBegin; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; + +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Arrays; +import java.util.List; + +/** Tests for {@link GroupByKey} for the {@link DataflowPipelineRunner}. */ +@RunWith(JUnit4.class) +public class DataflowGroupByKeyTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + /** + * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey} + * is not expanded. This is used for verifying that even without expansion the proper errors show + * up. + */ + private Pipeline createTestServiceRunner() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setProject("someproject"); + options.setStagingLocation("gs://staging"); + options.setPathValidatorClass(NoopPathValidator.class); + options.setDataflowClient(null); + return Pipeline.create(options); + } + + @Test + public void testInvalidWindowsService() { + Pipeline p = createTestServiceRunner(); + + List> ungroupedPairs = Arrays.asList(); + + PCollection> input = + p.apply(Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) + .apply(Window.>into( + Sessions.withGapDuration(Duration.standardMinutes(1)))); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("GroupByKey must have a valid Window merge function"); + input + .apply("GroupByKey", GroupByKey.create()) + .apply("GroupByKeyAgain", GroupByKey.>create()); + } + + @Test + public void testGroupByKeyServiceUnbounded() { + Pipeline p = createTestServiceRunner(); + + PCollection> input = + p.apply( + new PTransform>>() { + @Override + public PCollection> apply(PBegin input) { + return PCollection.>createPrimitiveOutputInternal( + input.getPipeline(), + WindowingStrategy.globalDefault(), + PCollection.IsBounded.UNBOUNDED) + .setTypeDescriptorInternal(new TypeDescriptor>() {}); + } + }); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage( + "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without " + + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey."); + + input.apply("GroupByKey", GroupByKey.create()); + } +} + diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java new file mode 100644 index 000000000000..c2a427344318 --- /dev/null +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.transforms; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.util.NoopPathValidator; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PBegin; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; + +import org.hamcrest.Matchers; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.internal.matchers.ThrowableMessageMatcher; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link View} for a {@link DataflowPipelineRunner}. */ +@RunWith(JUnit4.class) +public class DataflowViewTest { + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + private Pipeline createTestBatchRunner() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setProject("someproject"); + options.setStagingLocation("gs://staging"); + options.setPathValidatorClass(NoopPathValidator.class); + options.setDataflowClient(null); + return Pipeline.create(options); + } + + private Pipeline createTestStreamingRunner() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setStreaming(true); + options.setProject("someproject"); + options.setStagingLocation("gs://staging"); + options.setPathValidatorClass(NoopPathValidator.class); + options.setDataflowClient(null); + return Pipeline.create(options); + } + + private void testViewUnbounded( + Pipeline pipeline, + PTransform>, ? extends PCollectionView> view) { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Unable to create a side-input view from input"); + thrown.expectCause( + ThrowableMessageMatcher.hasMessage(Matchers.containsString("non-bounded PCollection"))); + pipeline + .apply( + new PTransform>>() { + @Override + public PCollection> apply(PBegin input) { + return PCollection.>createPrimitiveOutputInternal( + input.getPipeline(), + WindowingStrategy.globalDefault(), + PCollection.IsBounded.UNBOUNDED) + .setTypeDescriptorInternal(new TypeDescriptor>() {}); + } + }) + .apply(view); + } + + private void testViewNonmerging( + Pipeline pipeline, + PTransform>, ? extends PCollectionView> view) { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Unable to create a side-input view from input"); + thrown.expectCause( + ThrowableMessageMatcher.hasMessage(Matchers.containsString("Consumed by GroupByKey"))); + pipeline.apply(Create.>of(KV.of("hello", 5))) + .apply(Window.>into(new InvalidWindows<>( + "Consumed by GroupByKey", FixedWindows.of(Duration.standardHours(1))))) + .apply(view); + } + + @Test + public void testViewUnboundedAsSingletonBatch() { + testViewUnbounded(createTestBatchRunner(), View.>asSingleton()); + } + + @Test + public void testViewUnboundedAsSingletonStreaming() { + testViewUnbounded(createTestStreamingRunner(), View.>asSingleton()); + } + + @Test + public void testViewUnboundedAsIterableBatch() { + testViewUnbounded(createTestBatchRunner(), View.>asIterable()); + } + + @Test + public void testViewUnboundedAsIterableStreaming() { + testViewUnbounded(createTestStreamingRunner(), View.>asIterable()); + } + + @Test + public void testViewUnboundedAsListBatch() { + testViewUnbounded(createTestBatchRunner(), View.>asList()); + } + + @Test + public void testViewUnboundedAsListStreaming() { + testViewUnbounded(createTestStreamingRunner(), View.>asList()); + } + + @Test + public void testViewUnboundedAsMapBatch() { + testViewUnbounded(createTestBatchRunner(), View.asMap()); + } + + @Test + public void testViewUnboundedAsMapStreaming() { + testViewUnbounded(createTestStreamingRunner(), View.asMap()); + } + + @Test + public void testViewUnboundedAsMultimapBatch() { + testViewUnbounded(createTestBatchRunner(), View.asMultimap()); + } + + @Test + public void testViewUnboundedAsMultimapStreaming() { + testViewUnbounded(createTestStreamingRunner(), View.asMultimap()); + } + + @Test + public void testViewNonmergingAsSingletonBatch() { + testViewNonmerging(createTestBatchRunner(), View.>asSingleton()); + } + + @Test + public void testViewNonmergingAsSingletonStreaming() { + testViewNonmerging(createTestStreamingRunner(), View.>asSingleton()); + } + + @Test + public void testViewNonmergingAsIterableBatch() { + testViewNonmerging(createTestBatchRunner(), View.>asIterable()); + } + + @Test + public void testViewNonmergingAsIterableStreaming() { + testViewNonmerging(createTestStreamingRunner(), View.>asIterable()); + } + + @Test + public void testViewNonmergingAsListBatch() { + testViewNonmerging(createTestBatchRunner(), View.>asList()); + } + + @Test + public void testViewNonmergingAsListStreaming() { + testViewNonmerging(createTestStreamingRunner(), View.>asList()); + } + + @Test + public void testViewNonmergingAsMapBatch() { + testViewNonmerging(createTestBatchRunner(), View.asMap()); + } + + @Test + public void testViewNonmergingAsMapStreaming() { + testViewNonmerging(createTestStreamingRunner(), View.asMap()); + } + + @Test + public void testViewNonmergingAsMultimapBatch() { + testViewNonmerging(createTestBatchRunner(), View.asMultimap()); + } + + @Test + public void testViewNonmergingAsMultimapStreaming() { + testViewNonmerging(createTestStreamingRunner(), View.asMultimap()); + } +} + diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java index bb64f60199f0..6fb811e4bbb3 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java @@ -28,10 +28,8 @@ import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.coders.MapCoder; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; @@ -41,7 +39,6 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns; import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.util.NoopPathValidator; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PBegin; @@ -241,21 +238,6 @@ public void testWindowFnInvalidation() { Duration.standardMinutes(1))))); } - /** - * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey} - * is not expanded. This is used for verifying that even without expansion the proper errors show - * up. - */ - private Pipeline createTestServiceRunner() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setProject("someproject"); - options.setStagingLocation("gs://staging"); - options.setPathValidatorClass(NoopPathValidator.class); - options.setDataflowClient(null); - return Pipeline.create(options); - } - private Pipeline createTestDirectRunner() { DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class); options.setRunner(DirectPipelineRunner.class); @@ -281,25 +263,6 @@ public void testInvalidWindowsDirect() { .apply("GroupByKeyAgain", GroupByKey.>create()); } - @Test - public void testInvalidWindowsService() { - Pipeline p = createTestServiceRunner(); - - List> ungroupedPairs = Arrays.asList(); - - PCollection> input = - p.apply(Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) - .apply(Window.>into( - Sessions.withGapDuration(Duration.standardMinutes(1)))); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("GroupByKey must have a valid Window merge function"); - input - .apply("GroupByKey", GroupByKey.create()) - .apply("GroupByKeyAgain", GroupByKey.>create()); - } - @Test public void testRemerge() { Pipeline p = TestPipeline.create(); @@ -350,31 +313,6 @@ public PCollection> apply(PBegin input) { input.apply("GroupByKey", GroupByKey.create()); } - @Test - public void testGroupByKeyServiceUnbounded() { - Pipeline p = createTestServiceRunner(); - - PCollection> input = - p.apply( - new PTransform>>() { - @Override - public PCollection> apply(PBegin input) { - return PCollection.>createPrimitiveOutputInternal( - input.getPipeline(), - WindowingStrategy.globalDefault(), - PCollection.IsBounded.UNBOUNDED) - .setTypeDescriptorInternal(new TypeDescriptor>() {}); - } - }); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage( - "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without " - + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey."); - - input.apply("GroupByKey", GroupByKey.create()); - } - /** * Tests that when two elements are combined via a GroupByKey their output timestamp agrees * with the windowing function customized to actually be the same as the default, the earlier of diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/ViewTest.java index 64ec1764daac..69a4d2eba0e0 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/ViewTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/ViewTest.java @@ -33,10 +33,8 @@ import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.coders.VarIntCoder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; @@ -45,7 +43,6 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.util.NoopPathValidator; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PBegin; @@ -1333,27 +1330,6 @@ public void testViewGetName() { assertEquals("View.AsMultimap", View.asMultimap().getName()); } - private Pipeline createTestBatchRunner() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setProject("someproject"); - options.setStagingLocation("gs://staging"); - options.setPathValidatorClass(NoopPathValidator.class); - options.setDataflowClient(null); - return Pipeline.create(options); - } - - private Pipeline createTestStreamingRunner() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setStreaming(true); - options.setProject("someproject"); - options.setStagingLocation("gs://staging"); - options.setPathValidatorClass(NoopPathValidator.class); - options.setDataflowClient(null); - return Pipeline.create(options); - } - private Pipeline createTestDirectRunner() { DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class); options.setRunner(DirectPipelineRunner.class); @@ -1395,153 +1371,51 @@ private void testViewNonmerging( .apply(view); } - @Test - public void testViewUnboundedAsSingletonBatch() { - testViewUnbounded(createTestBatchRunner(), View.>asSingleton()); - } - - @Test - public void testViewUnboundedAsSingletonStreaming() { - testViewUnbounded(createTestStreamingRunner(), View.>asSingleton()); - } - @Test public void testViewUnboundedAsSingletonDirect() { testViewUnbounded(createTestDirectRunner(), View.>asSingleton()); } - @Test - public void testViewUnboundedAsIterableBatch() { - testViewUnbounded(createTestBatchRunner(), View.>asIterable()); - } - - @Test - public void testViewUnboundedAsIterableStreaming() { - testViewUnbounded(createTestStreamingRunner(), View.>asIterable()); - } - @Test public void testViewUnboundedAsIterableDirect() { testViewUnbounded(createTestDirectRunner(), View.>asIterable()); } - @Test - public void testViewUnboundedAsListBatch() { - testViewUnbounded(createTestBatchRunner(), View.>asList()); - } - - @Test - public void testViewUnboundedAsListStreaming() { - testViewUnbounded(createTestStreamingRunner(), View.>asList()); - } - @Test public void testViewUnboundedAsListDirect() { testViewUnbounded(createTestDirectRunner(), View.>asList()); } - @Test - public void testViewUnboundedAsMapBatch() { - testViewUnbounded(createTestBatchRunner(), View.asMap()); - } - - @Test - public void testViewUnboundedAsMapStreaming() { - testViewUnbounded(createTestStreamingRunner(), View.asMap()); - } - @Test public void testViewUnboundedAsMapDirect() { testViewUnbounded(createTestDirectRunner(), View.asMap()); } - - @Test - public void testViewUnboundedAsMultimapBatch() { - testViewUnbounded(createTestBatchRunner(), View.asMultimap()); - } - - @Test - public void testViewUnboundedAsMultimapStreaming() { - testViewUnbounded(createTestStreamingRunner(), View.asMultimap()); - } - @Test public void testViewUnboundedAsMultimapDirect() { testViewUnbounded(createTestDirectRunner(), View.asMultimap()); } - @Test - public void testViewNonmergingAsSingletonBatch() { - testViewNonmerging(createTestBatchRunner(), View.>asSingleton()); - } - - @Test - public void testViewNonmergingAsSingletonStreaming() { - testViewNonmerging(createTestStreamingRunner(), View.>asSingleton()); - } - @Test public void testViewNonmergingAsSingletonDirect() { testViewNonmerging(createTestDirectRunner(), View.>asSingleton()); } - @Test - public void testViewNonmergingAsIterableBatch() { - testViewNonmerging(createTestBatchRunner(), View.>asIterable()); - } - - @Test - public void testViewNonmergingAsIterableStreaming() { - testViewNonmerging(createTestStreamingRunner(), View.>asIterable()); - } - @Test public void testViewNonmergingAsIterableDirect() { testViewNonmerging(createTestDirectRunner(), View.>asIterable()); } - @Test - public void testViewNonmergingAsListBatch() { - testViewNonmerging(createTestBatchRunner(), View.>asList()); - } - - @Test - public void testViewNonmergingAsListStreaming() { - testViewNonmerging(createTestStreamingRunner(), View.>asList()); - } - @Test public void testViewNonmergingAsListDirect() { testViewNonmerging(createTestDirectRunner(), View.>asList()); } - @Test - public void testViewNonmergingAsMapBatch() { - testViewNonmerging(createTestBatchRunner(), View.asMap()); - } - - @Test - public void testViewNonmergingAsMapStreaming() { - testViewNonmerging(createTestStreamingRunner(), View.asMap()); - } - @Test public void testViewNonmergingAsMapDirect() { testViewNonmerging(createTestDirectRunner(), View.asMap()); } - - @Test - public void testViewNonmergingAsMultimapBatch() { - testViewNonmerging(createTestBatchRunner(), View.asMultimap()); - } - - @Test - public void testViewNonmergingAsMultimapStreaming() { - testViewNonmerging(createTestStreamingRunner(), View.asMultimap()); - } - @Test public void testViewNonmergingAsMultimapDirect() { testViewNonmerging(createTestDirectRunner(), View.asMultimap());