diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java index 90b8dc3f7761..268b793820d7 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java @@ -172,7 +172,7 @@ public interface WordCountOptions extends PipelineOptions { void setOutput(String value); } - static void runWordCount(WordCountOptions options) { + public static void runWordCount(WordCountOptions options) { Pipeline p = Pipeline.create(options); // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the diff --git a/sdks/java/io/azure/build.gradle b/sdks/java/io/azure/build.gradle index 6a26bd797cef..7847057ffe0d 100644 --- a/sdks/java/io/azure/build.gradle +++ b/sdks/java/io/azure/build.gradle @@ -16,6 +16,8 @@ * limitations under the License. */ +import groovy.json.JsonOutput + plugins { id 'org.apache.beam.module' } @@ -29,6 +31,8 @@ ext.summary = "IO library to read and write Azure services from Beam." repositories { jcenter() } +evaluationDependsOn(":examples:java") + dependencies { compile library.java.vendored_guava_26_0_jre compile project(path: ":sdks:java:core", configuration: "shadow") @@ -38,7 +42,36 @@ dependencies { compile "commons-io:commons-io:2.6" compile library.java.slf4j_api testCompile project(path: ":sdks:java:core", configuration: "shadowTest") + testCompile project(path: ":sdks:java:testing:test-utils", configuration: "testRuntime") + testCompile project(path: ":examples:java", configuration: "testRuntime") testCompile library.java.mockito_core testCompile library.java.junit testRuntimeOnly library.java.slf4j_jdk14 } + +/** + * These are integration tests with the real Azure service and the DirectRunner. + */ +task integrationTest(type: Test, dependsOn: processTestResources) { + group = "Verification" + def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' + def gcpTempRoot = project.findProperty('System.getProperty("user.dir")') ?: 'gs://temp-storage-for-end-to-end-tests' + systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ + "--runner=DirectRunner", + "--project=${gcpProject}", + "--tempRoot=${gcpTempRoot}", + ]) + + // Disable Gradle cache: these ITs interact with live service that should always be considered "out of date" + outputs.upToDateWhen { false } + + include '**/AzureWordCountIT.class' + maxParallelForks 4 + classpath = sourceSets.test.runtimeClasspath + testClassesDirs = sourceSets.test.output.classesDirs + testClassesDirs += files(project(":examples:java").sourceSets.test.output.classesDirs) + + useJUnit { + excludeCategories "org.apache.beam.sdk.testing.UsesKms" + } +} diff --git a/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzureWordCountIT.java b/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzureWordCountIT.java new file mode 100644 index 000000000000..192599c7d55b --- /dev/null +++ b/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzureWordCountIT.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.azure.blobstore; + +import static org.apache.beam.sdk.testing.FileChecksumMatcher.fileContentsHaveChecksum; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.Date; +import org.apache.beam.examples.WordCount; +import org.apache.beam.examples.WordCount.WordCountOptions; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.azure.options.BlobstoreOptions; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.util.NumberedShardedFile; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** End-to-end tests of WordCount. */ +@RunWith(JUnit4.class) +public class AzureWordCountIT { + private static final String DEFAULT_INPUT = "azfs://ettasaccount/test-container/test-blob"; + // this comes from running with the input file sdks/java/io/azure/test/resources/in.txt + private static final String DEFAULT_OUTPUT_CHECKSUM = "9725c097a55d3d3d7cc17936b0839c26dbb4991a"; + + /** + * Options for the WordCount Integration Test. + * + *
Define expected output file checksum to verify WordCount pipeline result with customized
+ * input.
+ */
+ public interface AzureWordCountITOptions
+ extends TestPipelineOptions, WordCountOptions, BlobstoreOptions {}
+
+ @BeforeClass
+ public static void setUp() {
+ PipelineOptionsFactory.register(TestPipelineOptions.class);
+ }
+
+ @Test
+ public void testE2EWordCount() throws Exception {
+ AzureWordCountITOptions options =
+ TestPipeline.testingPipelineOptions().as(AzureWordCountITOptions.class);
+ options.setAzureConnectionString(System.getenv("AZURE_STORAGE_CONNECTION_STRING"));
+
+ options.setInputFile(DEFAULT_INPUT);
+ options.setOutput(
+ FileSystems.matchNewResource(options.getTempRoot(), true)
+ .resolve(
+ String.format("WordCountIT-%tF-%