From 9eca6e144eacc8b8284ab50a0e1082232c020702 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=C5=81ukasz=20Gajowy?=
Date: Wed, 21 Feb 2018 17:03:37 +0100
Subject: [PATCH 1/7] [BEAM-3734] Add XmlIOIT using sink and readFiles()
The test can be parametrized with charset, number of records and filename prefix.
---
.../beam/sdk/io/common/DeleteFileFn.java | 47 ++++++
.../sdk/io/common/IOTestPipelineOptions.java | 7 +
.../org/apache/beam/sdk/io/avro/AvroIOIT.java | 3 +-
.../sdk/io/common/FileBasedIOITHelper.java | 26 ----
.../org/apache/beam/sdk/io/text/TextIOIT.java | 3 +-
.../beam/sdk/io/tfrecord/TFRecordIOIT.java | 3 +-
sdks/java/io/xml/build.gradle | 11 ++
sdks/java/io/xml/pom.xml | 14 +-
.../java/org/apache/beam/sdk/io/xml/Bird.java | 85 ++++++++++
.../org/apache/beam/sdk/io/xml/XmlIOIT.java | 147 ++++++++++++++++++
.../org/apache/beam/sdk/io/xml/XmlIOTest.java | 63 --------
11 files changed, 316 insertions(+), 93 deletions(-)
create mode 100644 sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DeleteFileFn.java
create mode 100644 sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/Bird.java
create mode 100644 sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DeleteFileFn.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DeleteFileFn.java
new file mode 100644
index 000000000000..65aeb4d950ce
--- /dev/null
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DeleteFileFn.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.common;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Deletes matching files using the FileSystems API.
+ */
+public class DeleteFileFn extends DoFn {
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws IOException {
+ MatchResult match = Iterables
+ .getOnlyElement(FileSystems.match(Collections.singletonList(c.element())));
+
+ Set resourceIds = new HashSet<>();
+ for (MatchResult.Metadata metadataElem : match.metadata()) {
+ resourceIds.add(metadataElem.resourceId());
+ }
+
+ FileSystems.delete(resourceIds);
+ }
+}
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index b86020ec2784..070733771bf9 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -107,4 +107,11 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
String getCompressionType();
void setCompressionType(String compressionType);
+
+ /* Xml */
+ @Description("Xml file charset name")
+ @Default.String("UTF-8")
+ String getCharset();
+
+ void setCharset(String charset);
}
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
index f93d4dc4cba3..f28cc14501fc 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
@@ -27,6 +27,7 @@
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.DeleteFileFn;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
@@ -119,7 +120,7 @@ public void writeThenReadAll() {
testFilenames.apply(
"Delete test files",
- ParDo.of(new FileBasedIOITHelper.DeleteFileFn())
+ ParDo.of(new DeleteFileFn())
.withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
pipeline.run().waitUntilFinish();
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
index 40b04617d8ad..1771784f73ef 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
@@ -19,16 +19,8 @@
package org.apache.beam.sdk.io.common;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import java.io.IOException;
-import java.util.Collections;
import java.util.Date;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -82,22 +74,4 @@ public void processElement(ProcessContext c) {
}
}
- /**
- * Deletes matching files using the FileSystems API.
- */
- public static class DeleteFileFn extends DoFn {
-
- @ProcessElement
- public void processElement(ProcessContext c) throws IOException {
- MatchResult match = Iterables
- .getOnlyElement(FileSystems.match(Collections.singletonList(c.element())));
-
- Set resourceIds = new HashSet<>();
- for (MatchResult.Metadata metadataElem : match.metadata()) {
- resourceIds.add(metadataElem.resourceId());
- }
-
- FileSystems.delete(resourceIds);
- }
- }
}
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index 3e67f6ab5803..8762b9f800df 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -26,6 +26,7 @@
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.common.DeleteFileFn;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
@@ -107,7 +108,7 @@ public void writeThenReadAll() {
testFilenames.apply(
"Delete test files",
- ParDo.of(new FileBasedIOITHelper.DeleteFileFn())
+ ParDo.of(new DeleteFileFn())
.withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
pipeline.run().waitUntilFinish();
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
index 99d23e0f432d..cfc5cd4641f0 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
@@ -26,6 +26,7 @@
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TFRecordIO;
+import org.apache.beam.sdk.io.common.DeleteFileFn;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
@@ -121,7 +122,7 @@ public void writeThenReadAll() {
.apply(Create.of(filenamePattern))
.apply(
"Delete test files",
- ParDo.of(new FileBasedIOITHelper.DeleteFileFn())
+ ParDo.of(new DeleteFileFn())
.withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
readPipeline.run().waitUntilFinish();
}
diff --git a/sdks/java/io/xml/build.gradle b/sdks/java/io/xml/build.gradle
index 5e66ad96f35a..61352424141a 100644
--- a/sdks/java/io/xml/build.gradle
+++ b/sdks/java/io/xml/build.gradle
@@ -21,6 +21,15 @@ applyJavaNature()
description = "Apache Beam :: SDKs :: Java :: IO :: XML"
+/*
+ * We need to rely on manually specifying these evaluationDependsOn to ensure that
+ * the following projects are evaluated before we evaluate this project. This is because
+ * we are attempting to reference the "sourceSets.test.output" directly.
+ * TODO: Swap to generating test artifacts which we can then rely on instead of
+ * the test outputs directly.
+ */
+evaluationDependsOn(":sdks:java:io:common")
+
dependencies {
compile library.java.guava
shadow project(path: ":sdks:java:core", configuration: "shadow")
@@ -29,6 +38,8 @@ dependencies {
shadow library.java.woodstox_core_asl
testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
testCompile project(path: ":runners:direct-java", configuration: "shadow")
+ testCompile project(path: ":sdks:java:io:common", configuration: "shadow")
+ testCompile project(":sdks:java:io:common").sourceSets.test.output
testCompile library.java.junit
testCompile library.java.slf4j_jdk14
testCompile library.java.hamcrest_core
diff --git a/sdks/java/io/xml/pom.xml b/sdks/java/io/xml/pom.xml
index f4783442d75e..db5a858c52a6 100644
--- a/sdks/java/io/xml/pom.xml
+++ b/sdks/java/io/xml/pom.xml
@@ -109,7 +109,19 @@
hamcrest-librarytest
-
+
+
+ org.apache.beam
+ beam-sdks-java-io-common
+ test
+ tests
+
+
+
+ org.apache.beam
+ beam-sdks-java-io-common
+ test
+
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/Bird.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/Bird.java
new file mode 100644
index 000000000000..dd52ded0855d
--- /dev/null
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/Bird.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.xml;
+
+import java.io.Serializable;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlType;
+
+/**
+ * Test JAXB annotated class.
+ */
+@SuppressWarnings("unused") @XmlRootElement(name = "bird") @XmlType(propOrder = { "name",
+ "adjective" }) public final class Bird implements Serializable {
+ private String name;
+ private String adjective;
+
+ @XmlElement(name = "species")
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getAdjective() {
+ return adjective;
+ }
+
+ public void setAdjective(String adjective) {
+ this.adjective = adjective;
+ }
+
+ public Bird() {}
+
+ public Bird(String adjective, String name) {
+ this.adjective = adjective;
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Bird bird = (Bird) o;
+
+ if (!name.equals(bird.name)) {
+ return false;
+ }
+ return adjective.equals(bird.adjective);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name.hashCode();
+ result = 31 * result + adjective.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Bird: %s, %s", name, adjective);
+ }
+}
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
new file mode 100644
index 000000000000..e93578a5ffe6
--- /dev/null
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.xml;
+
+import com.google.common.collect.ImmutableMap;
+import java.nio.charset.Charset;
+import java.util.Date;
+import java.util.Map;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.DeleteFileFn;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link org.apache.beam.sdk.io.xml.XmlIO}.
+ *
+ *
Run those tests using the command below. Pass in connection information via PipelineOptions:
+ *