diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java index e7b475d4caaf..b86020ec2784 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java @@ -91,10 +91,10 @@ public interface IOTestPipelineOptions extends TestPipelineOptions { /* Options for test pipeline for file-based I/O in 'sdks/java/io/file-based-io-tests/'. */ @Description("Number records that will be written and read by the test") - @Default.Long(100000) - Long getNumberOfRecords(); + @Default.Integer(100000) + Integer getNumberOfRecords(); - void setNumberOfRecords(Long count); + void setNumberOfRecords(Integer count); @Description("Destination prefix for files generated by the test") @Validation.Required diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java index 5f0a2fb00b21..79a144d144d5 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java @@ -95,7 +95,9 @@ public void processElement(ProcessContext c) { * the name() for the rows generated from seeds in [0, n). */ private static final Map EXPECTED_HASHES = ImmutableMap.of( - 1000, "7d94d63a41164be058a9680002914358" + 1000, "7d94d63a41164be058a9680002914358", + 100_000, "c7cbddb319209e200f1c5eebef8fe960", + 5_000_000, "c44f8a5648cd9207c9c6f77395a998dc" ); /** diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java index be0d6df2eb7a..07562f38ca33 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java @@ -75,7 +75,7 @@ public class AvroIOIT { + "}"); private static String filenamePrefix; - private static Long numberOfTextLines; + private static Integer numberOfTextLines; @Rule public TestPipeline pipeline = TestPipeline.create(); diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java index cf20d8e59544..40b04617d8ad 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java @@ -55,11 +55,11 @@ public static String appendTimestampToPrefix(String filenamePrefix) { return String.format("%s_%s", filenamePrefix, new Date().getTime()); } - public static String getExpectedHashForLineCount(Long lineCount) { - Map expectedHashes = ImmutableMap.of( - 100_000L, "4c8bb3b99dcc59459b20fefba400d446", - 1_000_000L, "9796db06e7a7960f974d5a91164afff1", - 100_000_000L, "6ce05f456e2fdc846ded2abd0ec1de95" + public static String getExpectedHashForLineCount(int lineCount) { + Map expectedHashes = ImmutableMap.of( + 100_000, "4c8bb3b99dcc59459b20fefba400d446", + 1_000_000, "9796db06e7a7960f974d5a91164afff1", + 100_000_000, "6ce05f456e2fdc846ded2abd0ec1de95" ); String hash = expectedHashes.get(lineCount); diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java index 1a4ecccc0eff..b611a5746d38 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java @@ -23,7 +23,6 @@ import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount; import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions; -import java.text.ParseException; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.TextIO; @@ -65,14 +64,14 @@ public class TextIOIT { private static String filenamePrefix; - private static Long numberOfTextLines; + private static Integer numberOfTextLines; private static Compression compressionType; @Rule public TestPipeline pipeline = TestPipeline.create(); @BeforeClass - public static void setup() throws ParseException { + public static void setup() { IOTestPipelineOptions options = readTestPipelineOptions(); numberOfTextLines = options.getNumberOfRecords(); diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java index 3f08d76750c5..2908c8c2b713 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java @@ -23,7 +23,6 @@ import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount; import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions; -import java.text.ParseException; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.TFRecordIO; @@ -67,7 +66,7 @@ public class TFRecordIOIT { private static String filenamePrefix; - private static Long numberOfTextLines; + private static Integer numberOfTextLines; private static Compression compressionType; @Rule @@ -77,7 +76,7 @@ public class TFRecordIOIT { public TestPipeline readPipeline = TestPipeline.create(); @BeforeClass - public static void setup() throws ParseException { + public static void setup() { IOTestPipelineOptions options = readTestPipelineOptions(); numberOfTextLines = options.getNumberOfRecords(); diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml index e6bb357a0894..73fbc52590a2 100644 --- a/sdks/java/io/jdbc/pom.xml +++ b/sdks/java/io/jdbc/pom.xml @@ -122,6 +122,7 @@ ${python.interpreter.bin} ${pkbLocation} + -beam_it_timeout=1800 -benchmarks=beam_integration_benchmark -beam_it_profile=io-it -beam_location=${beamRootProjectDir} @@ -204,6 +205,7 @@ ${python.interpreter.bin} ${pkbLocation} + -beam_it_timeout=1800 -benchmarks=beam_integration_benchmark -beam_it_profile=io-it -beam_location=${beamRootProjectDir} diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java index 32d6d9e80b41..941a77543d4e 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java @@ -41,9 +41,6 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.postgresql.ds.PGSimpleDataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * A test of {@link org.apache.beam.sdk.io.jdbc.JdbcIO} on an independent Postgres instance. @@ -56,7 +53,8 @@ * "--postgresUsername=postgres", * "--postgresDatabaseName=myfancydb", * "--postgresPassword=mypass", - * "--postgresSsl=false" ]' + * "--postgresSsl=false", + * "--numberOfRecords=1000" ]' * * *

If you want to run this with a runner besides directrunner, there are profiles for dataflow @@ -65,8 +63,8 @@ */ @RunWith(JUnit4.class) public class JdbcIOIT { - private static final Logger LOG = LoggerFactory.getLogger(JdbcIOIT.class); - public static final int EXPECTED_ROW_COUNT = 1000; + + private static int numberOfRows; private static PGSimpleDataSource dataSource; private static String tableName; @@ -81,14 +79,14 @@ public static void setup() throws SQLException, ParseException { IOTestPipelineOptions options = TestPipeline.testingPipelineOptions() .as(IOTestPipelineOptions.class); + numberOfRows = options.getNumberOfRecords(); dataSource = getDataSource(options); tableName = JdbcTestHelper.getTableName("IT"); JdbcTestHelper.createDataTable(dataSource, tableName); } - private static PGSimpleDataSource getDataSource(IOTestPipelineOptions options) - throws SQLException { + private static PGSimpleDataSource getDataSource(IOTestPipelineOptions options) { PGSimpleDataSource dataSource = new PGSimpleDataSource(); dataSource.setDatabaseName(options.getPostgresDatabaseName()); @@ -124,7 +122,7 @@ public void testWriteThenRead() { * the database.) */ private void runWrite() { - pipelineWrite.apply(GenerateSequence.from(0).to((long) EXPECTED_ROW_COUNT)) + pipelineWrite.apply(GenerateSequence.from(0).to(numberOfRows)) .apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())) .apply(JdbcIO.write() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource)) @@ -162,13 +160,13 @@ private void runRead() { PAssert.thatSingleton( namesAndIds.apply("Count All", Count.globally())) - .isEqualTo((long) EXPECTED_ROW_COUNT); + .isEqualTo((long) numberOfRows); PCollection consolidatedHashcode = namesAndIds .apply(ParDo.of(new TestRow.SelectNameFn())) .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults()); PAssert.that(consolidatedHashcode) - .containsInAnyOrder(TestRow.getExpectedHashForRowCount(EXPECTED_ROW_COUNT)); + .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows)); PCollection> frontOfList = namesAndIds.apply(Top.smallest(500)); @@ -178,8 +176,7 @@ private void runRead() { PCollection> backOfList = namesAndIds.apply(Top.largest(500)); Iterable expectedBackOfList = - TestRow.getExpectedValues(EXPECTED_ROW_COUNT - 500, - EXPECTED_ROW_COUNT); + TestRow.getExpectedValues(numberOfRows - 500, numberOfRows); PAssert.thatSingletonIterable(backOfList).containsInAnyOrder(expectedBackOfList); pipelineRead.run().waitUntilFinish();