Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f456095
implemented an azure blob storage filesystem
ettirapp Jul 10, 2020
137b777
added options to azure filesystem
ettirapp Aug 17, 2020
58f7f02
added experimental annotation
ettirapp Aug 17, 2020
1bbc947
adding mocks to azure filesystem tests
ettirapp Aug 17, 2020
4799c22
adding mocks to azure filesystem test
ettirapp Aug 17, 2020
5fb8001
resolving various reviewer comments
ettirapp Aug 17, 2020
8b4fd2a
adding mocks to filesystem test
ettirapp Aug 18, 2020
8f073fd
adding mocks to filesystem test
ettirapp Aug 18, 2020
6f30dc6
working on options and mocks
ettirapp Aug 19, 2020
7ec9694
adding options and unit tests
ettirapp Aug 19, 2020
26690cb
applied spotless
ettirapp Aug 19, 2020
4cf0f99
Update sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/…
ettirapp Aug 19, 2020
c2d91af
incorporating reviewer feeback
ettirapp Aug 19, 2020
4d14586
fixed a checkstyle issue, annotated tests in progress with @Ignore
ettirapp Aug 19, 2020
f97873d
adding javadoc
ettirapp Aug 19, 2020
9aea1b2
fixing a bug in AzureReadableSeekableByteChannel read
ettirapp Aug 19, 2020
aab24bf
removing logger from azure bytechannel
ettirapp Aug 19, 2020
e2f7eb2
removed non-serializable option
ettirapp Aug 19, 2020
4bc1857
removed non-serializable configuration option
ettirapp Aug 19, 2020
fb1467d
removed integration tests - they will be in a followup PR
ettirapp Aug 20, 2020
8daeb4d
working on integration tests for azure filesystem
ettirapp Aug 20, 2020
b228a32
adding integration tests
ettirapp Aug 20, 2020
b021a7b
rebasing
ettirapp Aug 20, 2020
91fff24
organizing imports
ettirapp Aug 20, 2020
8d75641
removed unused tests
ettirapp Aug 20, 2020
59d5c67
adding azure wordcount test
ettirapp Aug 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public interface WordCountOptions extends PipelineOptions {
void setOutput(String value);
}

static void runWordCount(WordCountOptions options) {
public static void runWordCount(WordCountOptions options) {
Pipeline p = Pipeline.create(options);

// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
Expand Down
33 changes: 33 additions & 0 deletions sdks/java/io/azure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* limitations under the License.
*/

import groovy.json.JsonOutput

plugins {
id 'org.apache.beam.module'
}
Expand All @@ -29,6 +31,8 @@ ext.summary = "IO library to read and write Azure services from Beam."

repositories { jcenter() }

evaluationDependsOn(":examples:java")

dependencies {
compile library.java.vendored_guava_26_0_jre
compile project(path: ":sdks:java:core", configuration: "shadow")
Expand All @@ -38,7 +42,36 @@ dependencies {
compile "commons-io:commons-io:2.6"
compile library.java.slf4j_api
testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
testCompile project(path: ":sdks:java:testing:test-utils", configuration: "testRuntime")
testCompile project(path: ":examples:java", configuration: "testRuntime")
testCompile library.java.mockito_core
testCompile library.java.junit
testRuntimeOnly library.java.slf4j_jdk14
}

/**
* These are integration tests with the real Azure service and the DirectRunner.
*/
task integrationTest(type: Test, dependsOn: processTestResources) {
group = "Verification"
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcpTempRoot = project.findProperty('System.getProperty("user.dir")') ?: 'gs://temp-storage-for-end-to-end-tests'
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=DirectRunner",
"--project=${gcpProject}",
"--tempRoot=${gcpTempRoot}",
])

// Disable Gradle cache: these ITs interact with live service that should always be considered "out of date"
outputs.upToDateWhen { false }

include '**/AzureWordCountIT.class'
maxParallelForks 4
classpath = sourceSets.test.runtimeClasspath
testClassesDirs = sourceSets.test.output.classesDirs
testClassesDirs += files(project(":examples:java").sourceSets.test.output.classesDirs)

useJUnit {
excludeCategories "org.apache.beam.sdk.testing.UsesKms"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.azure.blobstore;

import static org.apache.beam.sdk.testing.FileChecksumMatcher.fileContentsHaveChecksum;
import static org.hamcrest.MatcherAssert.assertThat;

import java.util.Date;
import org.apache.beam.examples.WordCount;
import org.apache.beam.examples.WordCount.WordCountOptions;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.util.NumberedShardedFile;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** End-to-end tests of WordCount. */
@RunWith(JUnit4.class)
public class AzureWordCountIT {
private static final String DEFAULT_INPUT = "azfs://ettasaccount/test-container/test-blob";
// this comes from running with the input file sdks/java/io/azure/test/resources/in.txt
private static final String DEFAULT_OUTPUT_CHECKSUM = "9725c097a55d3d3d7cc17936b0839c26dbb4991a";

/**
* Options for the WordCount Integration Test.
*
* <p>Define expected output file checksum to verify WordCount pipeline result with customized
* input.
*/
public interface AzureWordCountITOptions
extends TestPipelineOptions, WordCountOptions, BlobstoreOptions {}

@BeforeClass
public static void setUp() {
PipelineOptionsFactory.register(TestPipelineOptions.class);
}

@Test
public void testE2EWordCount() throws Exception {
AzureWordCountITOptions options =
TestPipeline.testingPipelineOptions().as(AzureWordCountITOptions.class);
options.setAzureConnectionString(System.getenv("AZURE_STORAGE_CONNECTION_STRING"));

options.setInputFile(DEFAULT_INPUT);
options.setOutput(
FileSystems.matchNewResource(options.getTempRoot(), true)
.resolve(
String.format("WordCountIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date()),
StandardResolveOptions.RESOLVE_DIRECTORY)
.resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY)
.resolve("results", StandardResolveOptions.RESOLVE_FILE)
.toString());
WordCount.runWordCount(options);
assertThat(
new NumberedShardedFile(options.getOutput() + "*-of-*"),
fileContentsHaveChecksum(DEFAULT_OUTPUT_CHECKSUM));
}
}