diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_FileBasedIO_IT.groovy b/.test-infra/jenkins/job_beam_PerformanceTests_FileBasedIO_IT.groovy
index 667b11d20725..117ff40d8952 100644
--- a/.test-infra/jenkins/job_beam_PerformanceTests_FileBasedIO_IT.groovy
+++ b/.test-infra/jenkins/job_beam_PerformanceTests_FileBasedIO_IT.groovy
@@ -26,6 +26,9 @@ def testsConfigurations = [
bqTable : 'beam_performance.textioit_pkb_results',
prCommitStatusName: 'Java TextIO Performance Test',
prTriggerPhase : 'Run Java TextIO Performance Test',
+ extraPipelineArgs: [
+ numberOfRecords: '1000000'
+ ]
],
[
@@ -36,6 +39,7 @@ def testsConfigurations = [
prCommitStatusName : 'Java CompressedTextIO Performance Test',
prTriggerPhase : 'Run Java CompressedTextIO Performance Test',
extraPipelineArgs: [
+ numberOfRecords: '1000000',
compressionType: 'GZIP'
]
],
@@ -46,6 +50,9 @@ def testsConfigurations = [
bqTable : 'beam_performance.avroioit_pkb_results',
prCommitStatusName: 'Java AvroIO Performance Test',
prTriggerPhase : 'Run Java AvroIO Performance Test',
+ extraPipelineArgs: [
+ numberOfRecords: '1000000'
+ ]
],
[
jobName : 'beam_PerformanceTests_TFRecordIOIT',
@@ -54,7 +61,22 @@ def testsConfigurations = [
bqTable : 'beam_performance.tfrecordioit_pkb_results',
prCommitStatusName: 'Java TFRecordIO Performance Test',
prTriggerPhase : 'Run Java TFRecordIO Performance Test',
+ extraPipelineArgs: [
+ numberOfRecords: '1000000'
+ ]
],
+ [
+ jobName : 'beam_PerformanceTests_XmlIOIT',
+ jobDescription : 'Runs PerfKit tests for beam_PerformanceTests_XmlIOIT',
+ itClass : 'org.apache.beam.sdk.io.xml.XmlIOIT',
+ bqTable : 'beam_performance.xmlioit_pkb_results',
+ prCommitStatusName: 'Java XmlIOPerformance Test',
+ prTriggerPhase : 'Run Java XmlIO Performance Test',
+ extraPipelineArgs: [
+ numberOfRecords: '100000000',
+ charset: 'UTF-8'
+ ]
+ ]
]
for (testConfiguration in testsConfigurations) {
@@ -89,7 +111,6 @@ private void create_filebasedio_performance_test_job(testConfiguration) {
def pipelineArgs = [
project : 'apache-beam-testing',
tempRoot : 'gs://temp-storage-for-perf-tests',
- numberOfRecords: '1000000',
filenamePrefix : "gs://temp-storage-for-perf-tests/${testConfiguration.jobName}/\${BUILD_ID}/",
]
if (testConfiguration.containsKey('extraPipelineArgs')) {
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index b86020ec2784..89b7ae81bc5a 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -107,4 +107,11 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
String getCompressionType();
void setCompressionType(String compressionType);
+
+ /* Used by XmlIOIT */
+ @Description("Xml file charset name")
+ @Default.String("UTF-8")
+ String getCharset();
+
+ void setCharset(String charset);
}
diff --git a/sdks/java/io/file-based-io-tests/build.gradle b/sdks/java/io/file-based-io-tests/build.gradle
index b1a9167e73e3..e797172850a7 100644
--- a/sdks/java/io/file-based-io-tests/build.gradle
+++ b/sdks/java/io/file-based-io-tests/build.gradle
@@ -35,6 +35,7 @@ dependencies {
shadowTest project(":runners:direct-java").sourceSets.test.output
shadowTest project(":sdks:java:io:common")
shadowTest project(":sdks:java:io:common").sourceSets.test.output
+ shadowTest project(":sdks:java:io:xml")
shadowTest library.java.guava
shadowTest library.java.junit
shadowTest library.java.hamcrest_core
diff --git a/sdks/java/io/file-based-io-tests/pom.xml b/sdks/java/io/file-based-io-tests/pom.xml
index 740def811bb7..369f2bc9fde9 100644
--- a/sdks/java/io/file-based-io-tests/pom.xml
+++ b/sdks/java/io/file-based-io-tests/pom.xml
@@ -349,6 +349,10 @@
avro
test
-
+
+ org.apache.beam
+ beam-sdks-java-io-xml
+ test
+
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
index f93d4dc4cba3..19f4a68b5ff0 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.io.avro;
-import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampToPrefix;
+import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
@@ -28,6 +28,7 @@
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
+import org.apache.beam.sdk.io.common.FileBasedIOITHelper.DeleteFileFn;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
@@ -85,7 +86,7 @@ public static void setup() {
IOTestPipelineOptions options = readTestPipelineOptions();
numberOfTextLines = options.getNumberOfRecords();
- filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
+ filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
}
@Test
@@ -119,7 +120,7 @@ public void writeThenReadAll() {
testFilenames.apply(
"Delete test files",
- ParDo.of(new FileBasedIOITHelper.DeleteFileFn())
+ ParDo.of(new DeleteFileFn())
.withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
pipeline.run().waitUntilFinish();
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
index 40b04617d8ad..bbf707e26dd7 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.beam.sdk.io.common;
import com.google.common.collect.ImmutableMap;
@@ -51,8 +50,8 @@ public static IOTestPipelineOptions readTestPipelineOptions() {
return PipelineOptionsValidator.validate(IOTestPipelineOptions.class, options);
}
- public static String appendTimestampToPrefix(String filenamePrefix) {
- return String.format("%s_%s", filenamePrefix, new Date().getTime());
+ public static String appendTimestampSuffix(String text) {
+ return String.format("%s_%s", text, new Date().getTime());
}
public static String getExpectedHashForLineCount(int lineCount) {
@@ -62,10 +61,14 @@ public static String getExpectedHashForLineCount(int lineCount) {
100_000_000, "6ce05f456e2fdc846ded2abd0ec1de95"
);
- String hash = expectedHashes.get(lineCount);
+ return getHashForRecordCount(lineCount, expectedHashes);
+ }
+
+ public static String getHashForRecordCount(int recordCount, Map hashes) {
+ String hash = hashes.get(recordCount);
if (hash == null) {
throw new UnsupportedOperationException(
- String.format("No hash for that line count: %s", lineCount)
+ String.format("No hash for that record count: %s", recordCount)
);
}
return hash;
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index 3e67f6ab5803..f6e27cdb3564 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -19,7 +19,7 @@
package org.apache.beam.sdk.io.text;
import static org.apache.beam.sdk.io.Compression.AUTO;
-import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampToPrefix;
+import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
@@ -27,6 +27,7 @@
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
+import org.apache.beam.sdk.io.common.FileBasedIOITHelper.DeleteFileFn;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
@@ -75,7 +76,7 @@ public static void setup() {
IOTestPipelineOptions options = readTestPipelineOptions();
numberOfTextLines = options.getNumberOfRecords();
- filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
+ filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
compressionType = Compression.valueOf(options.getCompressionType());
}
@@ -107,7 +108,7 @@ public void writeThenReadAll() {
testFilenames.apply(
"Delete test files",
- ParDo.of(new FileBasedIOITHelper.DeleteFileFn())
+ ParDo.of(new DeleteFileFn())
.withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
pipeline.run().waitUntilFinish();
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
index 99d23e0f432d..1ce155912ca1 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
@@ -19,7 +19,7 @@
package org.apache.beam.sdk.io.tfrecord;
import static org.apache.beam.sdk.io.Compression.AUTO;
-import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampToPrefix;
+import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
@@ -27,6 +27,7 @@
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TFRecordIO;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
+import org.apache.beam.sdk.io.common.FileBasedIOITHelper.DeleteFileFn;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
@@ -80,7 +81,7 @@ public static void setup() {
IOTestPipelineOptions options = readTestPipelineOptions();
numberOfTextLines = options.getNumberOfRecords();
- filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
+ filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
compressionType = Compression.valueOf(options.getCompressionType());
}
@@ -121,7 +122,7 @@ public void writeThenReadAll() {
.apply(Create.of(filenamePattern))
.apply(
"Delete test files",
- ParDo.of(new FileBasedIOITHelper.DeleteFileFn())
+ ParDo.of(new DeleteFileFn())
.withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
readPipeline.run().waitUntilFinish();
}
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
new file mode 100644
index 000000000000..7176d7f42da0
--- /dev/null
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.xml;
+
+import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
+import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getHashForRecordCount;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Map;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlType;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link org.apache.beam.sdk.io.xml.XmlIO}.
+ *
+ * Run those tests using the command below. Pass in connection information via PipelineOptions:
+ *
+ * mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests
+ * -Dit.test=org.apache.beam.sdk.io.xml.XmlIOIT
+ * -DintegrationTestPipelineOptions='[
+ * "--numberOfRecords=100000",
+ * "--filenamePrefix=output_file_path",
+ * "--charset=UTF-8",
+ * ]'
+ *
+ *
+ */
+@RunWith(JUnit4.class)
+public class XmlIOIT {
+
+ private static final Map EXPECTED_HASHES = ImmutableMap.of(
+ 1000, "7f51adaf701441ee83459a3f705c1b86",
+ 100_000, "af7775de90d0b0c8bbc36273fbca26fe",
+ 100_000_000, "bfee52b33aa1552b9c1bfa8bcc41ae80"
+ );
+
+ private static Integer numberOfRecords;
+
+ private static String filenamePrefix;
+
+ private static Charset charset;
+
+ @Rule
+ public TestPipeline pipeline = TestPipeline.create();
+
+ @BeforeClass
+ public static void setup() {
+ PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+ IOTestPipelineOptions options = TestPipeline
+ .testingPipelineOptions()
+ .as(IOTestPipelineOptions.class);
+
+ filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
+ numberOfRecords = options.getNumberOfRecords();
+ charset = Charset.forName(options.getCharset());
+ }
+
+ @Test
+ public void writeThenReadAll() {
+ PCollection testFileNames = pipeline
+ .apply("Generate sequence", GenerateSequence.from(0).to(numberOfRecords))
+ .apply("Create xml records", MapElements.via(new LongToBird()))
+ .apply("Write xml files", FileIO.write()
+ .via(XmlIO.sink(Bird.class)
+ .withRootElement("birds")
+ .withCharset(charset))
+ .to(filenamePrefix)
+ .withPrefix("birds")
+ .withSuffix(".xml"))
+ .getPerDestinationOutputFilenames()
+ .apply("Prevent fusion", Reshuffle.viaRandomKey())
+ .apply("Get file names", Values.create());
+
+ PCollection birds = testFileNames
+ .apply("Find files", FileIO.matchAll())
+ .apply("Read matched files", FileIO.readMatches())
+ .apply("Read xml files", XmlIO.readFiles()
+ .withRecordClass(Bird.class).withRootElement("birds")
+ .withRecordElement("bird")
+ .withCharset(charset));
+
+ PCollection consolidatedHashcode = birds
+ .apply("Map xml records to strings", MapElements.via(new BirdToString()))
+ .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+ String expectedHash = getHashForRecordCount(numberOfRecords, EXPECTED_HASHES);
+ PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
+
+ testFileNames.apply("Delete test files", ParDo.of(new FileBasedIOITHelper.DeleteFileFn())
+ .withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ private static class LongToBird extends SimpleFunction {
+ @Override
+ public Bird apply(Long input) {
+ return new Bird("Testing", "Bird number " + input);
+ }
+ }
+
+ private static class BirdToString extends SimpleFunction {
+ @Override
+ public String apply(Bird input) {
+ return input.toString();
+ }
+ }
+
+ @SuppressWarnings("unused")
+ @XmlRootElement(name = "bird")
+ @XmlType(propOrder = { "name", "adjective" })
+ private static final class Bird implements Serializable {
+ private String name;
+ private String adjective;
+
+ @XmlElement(name = "species")
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getAdjective() {
+ return adjective;
+ }
+
+ public void setAdjective(String adjective) {
+ this.adjective = adjective;
+ }
+
+ public Bird() {}
+
+ public Bird(String adjective, String name) {
+ this.adjective = adjective;
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Bird bird = (Bird) o;
+
+ if (!name.equals(bird.name)) {
+ return false;
+ }
+ return adjective.equals(bird.adjective);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name.hashCode();
+ result = 31 * result + adjective.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Bird: %s, %s", name, adjective);
+ }
+ }
+}
diff --git a/sdks/java/io/xml/pom.xml b/sdks/java/io/xml/pom.xml
index f4783442d75e..996063a227ce 100644
--- a/sdks/java/io/xml/pom.xml
+++ b/sdks/java/io/xml/pom.xml
@@ -109,7 +109,6 @@
hamcrest-library
test
-
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlIOTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlIOTest.java
index f6870e12a614..cb083859b011 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlIOTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlIOTest.java
@@ -214,7 +214,7 @@ public void testWriteDisplayData() {
*/
@SuppressWarnings("unused")
@XmlRootElement(name = "bird")
- @XmlType(propOrder = {"name", "adjective"})
+ @XmlType(propOrder = { "name", "adjective" })
private static final class Bird implements Serializable {
private String name;
private String adjective;
@@ -266,5 +266,10 @@ public int hashCode() {
result = 31 * result + adjective.hashCode();
return result;
}
+
+ @Override
+ public String toString() {
+ return String.format("Bird: %s, %s", name, adjective);
+ }
}
}