Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ def testsConfigurations = [
bqTable : 'beam_performance.textioit_pkb_results',
prCommitStatusName: 'Java TextIO Performance Test',
prTriggerPhase : 'Run Java TextIO Performance Test',
extraPipelineArgs: [
numberOfRecords: '1000000'
]

],
[
Expand All @@ -36,6 +39,7 @@ def testsConfigurations = [
prCommitStatusName : 'Java CompressedTextIO Performance Test',
prTriggerPhase : 'Run Java CompressedTextIO Performance Test',
extraPipelineArgs: [
numberOfRecords: '1000000',
compressionType: 'GZIP'
]
],
Expand All @@ -46,6 +50,9 @@ def testsConfigurations = [
bqTable : 'beam_performance.avroioit_pkb_results',
prCommitStatusName: 'Java AvroIO Performance Test',
prTriggerPhase : 'Run Java AvroIO Performance Test',
extraPipelineArgs: [
numberOfRecords: '1000000'
]
],
[
jobName : 'beam_PerformanceTests_TFRecordIOIT',
Expand All @@ -54,7 +61,22 @@ def testsConfigurations = [
bqTable : 'beam_performance.tfrecordioit_pkb_results',
prCommitStatusName: 'Java TFRecordIO Performance Test',
prTriggerPhase : 'Run Java TFRecordIO Performance Test',
extraPipelineArgs: [
numberOfRecords: '1000000'
]
],
[
jobName : 'beam_PerformanceTests_XmlIOIT',
jobDescription : 'Runs PerfKit tests for beam_PerformanceTests_XmlIOIT',
itClass : 'org.apache.beam.sdk.io.xml.XmlIOIT',
bqTable : 'beam_performance.xmlioit_pkb_results',
prCommitStatusName: 'Java XmlIOPerformance Test',
prTriggerPhase : 'Run Java XmlIO Performance Test',
extraPipelineArgs: [
numberOfRecords: '100000000',
charset: 'UTF-8'
]
]
]

for (testConfiguration in testsConfigurations) {
Expand Down Expand Up @@ -89,7 +111,6 @@ private void create_filebasedio_performance_test_job(testConfiguration) {
def pipelineArgs = [
project : 'apache-beam-testing',
tempRoot : 'gs://temp-storage-for-perf-tests',
numberOfRecords: '1000000',
filenamePrefix : "gs://temp-storage-for-perf-tests/${testConfiguration.jobName}/\${BUILD_ID}/",
]
if (testConfiguration.containsKey('extraPipelineArgs')) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,11 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
String getCompressionType();

void setCompressionType(String compressionType);

/* Used by XmlIOIT */
@Description("Xml file charset name")
@Default.String("UTF-8")
String getCharset();

void setCharset(String charset);
}
1 change: 1 addition & 0 deletions sdks/java/io/file-based-io-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies {
shadowTest project(":runners:direct-java").sourceSets.test.output
shadowTest project(":sdks:java:io:common")
shadowTest project(":sdks:java:io:common").sourceSets.test.output
shadowTest project(":sdks:java:io:xml")
shadowTest library.java.guava
shadowTest library.java.junit
shadowTest library.java.hamcrest_core
Expand Down
6 changes: 5 additions & 1 deletion sdks/java/io/file-based-io-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@
<artifactId>avro</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-xml</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.io.avro;

import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampToPrefix;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;

Expand All @@ -28,6 +28,7 @@
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper.DeleteFileFn;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
Expand Down Expand Up @@ -85,7 +86,7 @@ public static void setup() {
IOTestPipelineOptions options = readTestPipelineOptions();

numberOfTextLines = options.getNumberOfRecords();
filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
}

@Test
Expand Down Expand Up @@ -119,7 +120,7 @@ public void writeThenReadAll() {

testFilenames.apply(
"Delete test files",
ParDo.of(new FileBasedIOITHelper.DeleteFileFn())
ParDo.of(new DeleteFileFn())
.withSideInputs(consolidatedHashcode.apply(View.asSingleton())));

pipeline.run().waitUntilFinish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.sdk.io.common;

import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -51,8 +50,8 @@ public static IOTestPipelineOptions readTestPipelineOptions() {
return PipelineOptionsValidator.validate(IOTestPipelineOptions.class, options);
}

public static String appendTimestampToPrefix(String filenamePrefix) {
return String.format("%s_%s", filenamePrefix, new Date().getTime());
public static String appendTimestampSuffix(String text) {
return String.format("%s_%s", text, new Date().getTime());
}

public static String getExpectedHashForLineCount(int lineCount) {
Expand All @@ -62,10 +61,14 @@ public static String getExpectedHashForLineCount(int lineCount) {
100_000_000, "6ce05f456e2fdc846ded2abd0ec1de95"
);

String hash = expectedHashes.get(lineCount);
return getHashForRecordCount(lineCount, expectedHashes);
}

public static String getHashForRecordCount(int recordCount, Map<Integer, String> hashes) {
String hash = hashes.get(recordCount);
if (hash == null) {
throw new UnsupportedOperationException(
String.format("No hash for that line count: %s", lineCount)
String.format("No hash for that record count: %s", recordCount)
);
}
return hash;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
package org.apache.beam.sdk.io.text;

import static org.apache.beam.sdk.io.Compression.AUTO;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampToPrefix;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;

import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper.DeleteFileFn;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
Expand Down Expand Up @@ -75,7 +76,7 @@ public static void setup() {
IOTestPipelineOptions options = readTestPipelineOptions();

numberOfTextLines = options.getNumberOfRecords();
filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
compressionType = Compression.valueOf(options.getCompressionType());
}

Expand Down Expand Up @@ -107,7 +108,7 @@ public void writeThenReadAll() {

testFilenames.apply(
"Delete test files",
ParDo.of(new FileBasedIOITHelper.DeleteFileFn())
ParDo.of(new DeleteFileFn())
.withSideInputs(consolidatedHashcode.apply(View.asSingleton())));

pipeline.run().waitUntilFinish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
package org.apache.beam.sdk.io.tfrecord;

import static org.apache.beam.sdk.io.Compression.AUTO;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampToPrefix;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;

import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TFRecordIO;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper.DeleteFileFn;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
Expand Down Expand Up @@ -80,7 +81,7 @@ public static void setup() {
IOTestPipelineOptions options = readTestPipelineOptions();

numberOfTextLines = options.getNumberOfRecords();
filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
compressionType = Compression.valueOf(options.getCompressionType());
}

Expand Down Expand Up @@ -121,7 +122,7 @@ public void writeThenReadAll() {
.apply(Create.of(filenamePattern))
.apply(
"Delete test files",
ParDo.of(new FileBasedIOITHelper.DeleteFileFn())
ParDo.of(new DeleteFileFn())
.withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
readPipeline.run().waitUntilFinish();
}
Expand Down
Loading