From f811d7ecdc7ee553355418c754e91c538f792c27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Gajowy?= Date: Thu, 11 Jan 2018 16:26:38 +0100 Subject: [PATCH 1/4] [BEAM-3456] Enable jenkins and large scale scenario in JDBC --- .../job_beam_PerformanceTests_JDBC.groovy | 40 ++++++++++++------- .../sdk/io/common/IOTestPipelineOptions.java | 6 +-- .../apache/beam/sdk/io/common/TestRow.java | 4 +- .../org/apache/beam/sdk/io/avro/AvroIOIT.java | 2 +- .../sdk/io/common/FileBasedIOITHelper.java | 10 ++--- .../org/apache/beam/sdk/io/text/TextIOIT.java | 5 +-- .../beam/sdk/io/tfrecord/TFRecordIOIT.java | 5 +-- sdks/java/io/jdbc/pom.xml | 2 + .../org/apache/beam/sdk/io/jdbc/JdbcIOIT.java | 23 +++++------ 9 files changed, 53 insertions(+), 44 deletions(-) diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy b/.test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy index ef73a261b0c4..10d2db5b2957 100644 --- a/.test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy +++ b/.test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy @@ -23,6 +23,11 @@ job('beam_PerformanceTests_JDBC'){ // Set default Beam job properties. common_job_properties.setTopLevelMainJobProperties(delegate) + common_job_properties.enablePhraseTriggeringFromPullRequest( + delegate, + 'Java JdbcIO Performance Test', + 'Run Java JdbcIO Performance Test') + // Run job in postcommit every 6 hours, don't trigger every push, and // don't email individual committers. common_job_properties.setPostCommit( @@ -33,14 +38,12 @@ job('beam_PerformanceTests_JDBC'){ false) def pipelineArgs = [ - tempRoot: 'gs://temp-storage-for-end-to-end-tests', - project: 'apache-beam-testing', - postgresServerName: '10.36.0.11', - postgresUsername: 'postgres', - postgresDatabaseName: 'postgres', - postgresPassword: 'uuinkks', - postgresSsl: 'false' + project: 'apache-beam-testing', + tempRoot: 'gs://temp-storage-for-perf-tests', + postgresPort: '5432', + numberOfRecords: '5000000' ] + def pipelineArgList = [] pipelineArgs.each({ key, value -> pipelineArgList.add("--$key=$value") @@ -48,16 +51,23 @@ job('beam_PerformanceTests_JDBC'){ def pipelineArgsJoined = pipelineArgList.join(',') def argMap = [ - benchmarks: 'beam_integration_benchmark', - beam_it_module: 'sdks/java/io/jdbc', - beam_it_args: pipelineArgsJoined, - beam_it_class: 'org.apache.beam.sdk.io.jdbc.JdbcIOIT', - // Profile is located in $BEAM_ROOT/sdks/java/io/pom.xml. - beam_it_profile: 'io-it' + kubeconfig: '/home/jenkins/.kube/config', + beam_it_timeout: '1200', + benchmarks: 'beam_integration_benchmark', + beam_it_profile: 'io-it', + beam_prebuilt: 'true', + beam_sdk: 'java', + beam_it_module: 'sdks/java/io/jdbc', + beam_it_class: 'org.apache.beam.sdk.io.jdbc.JdbcIOIT', + beam_it_options: pipelineArgsJoined, + beam_kubernetes_scripts: makePathAbsolute('.test-infra/kubernetes/postgres/postgres.yml'), + beam_options_config_file: makePathAbsolute('.test-infra/kubernetes/postgres/pkb-config.yml'), + bigquery_table: 'beam_performance.JdbcIOIT_pkb_results' ] common_job_properties.buildPerformanceTest(delegate, argMap) +} - // [BEAM-2141] Perf tests do not pass. - disabled() +static def makePathAbsolute(String path) { + return '"$WORKSPACE/' + path + '"' } diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java index e7b475d4caaf..b86020ec2784 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java @@ -91,10 +91,10 @@ public interface IOTestPipelineOptions extends TestPipelineOptions { /* Options for test pipeline for file-based I/O in 'sdks/java/io/file-based-io-tests/'. */ @Description("Number records that will be written and read by the test") - @Default.Long(100000) - Long getNumberOfRecords(); + @Default.Integer(100000) + Integer getNumberOfRecords(); - void setNumberOfRecords(Long count); + void setNumberOfRecords(Integer count); @Description("Destination prefix for files generated by the test") @Validation.Required diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java index 5f0a2fb00b21..79a144d144d5 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java @@ -95,7 +95,9 @@ public void processElement(ProcessContext c) { * the name() for the rows generated from seeds in [0, n). */ private static final Map EXPECTED_HASHES = ImmutableMap.of( - 1000, "7d94d63a41164be058a9680002914358" + 1000, "7d94d63a41164be058a9680002914358", + 100_000, "c7cbddb319209e200f1c5eebef8fe960", + 5_000_000, "c44f8a5648cd9207c9c6f77395a998dc" ); /** diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java index be0d6df2eb7a..07562f38ca33 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java @@ -75,7 +75,7 @@ public class AvroIOIT { + "}"); private static String filenamePrefix; - private static Long numberOfTextLines; + private static Integer numberOfTextLines; @Rule public TestPipeline pipeline = TestPipeline.create(); diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java index cf20d8e59544..40b04617d8ad 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java @@ -55,11 +55,11 @@ public static String appendTimestampToPrefix(String filenamePrefix) { return String.format("%s_%s", filenamePrefix, new Date().getTime()); } - public static String getExpectedHashForLineCount(Long lineCount) { - Map expectedHashes = ImmutableMap.of( - 100_000L, "4c8bb3b99dcc59459b20fefba400d446", - 1_000_000L, "9796db06e7a7960f974d5a91164afff1", - 100_000_000L, "6ce05f456e2fdc846ded2abd0ec1de95" + public static String getExpectedHashForLineCount(int lineCount) { + Map expectedHashes = ImmutableMap.of( + 100_000, "4c8bb3b99dcc59459b20fefba400d446", + 1_000_000, "9796db06e7a7960f974d5a91164afff1", + 100_000_000, "6ce05f456e2fdc846ded2abd0ec1de95" ); String hash = expectedHashes.get(lineCount); diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java index 1a4ecccc0eff..b611a5746d38 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java @@ -23,7 +23,6 @@ import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount; import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions; -import java.text.ParseException; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.TextIO; @@ -65,14 +64,14 @@ public class TextIOIT { private static String filenamePrefix; - private static Long numberOfTextLines; + private static Integer numberOfTextLines; private static Compression compressionType; @Rule public TestPipeline pipeline = TestPipeline.create(); @BeforeClass - public static void setup() throws ParseException { + public static void setup() { IOTestPipelineOptions options = readTestPipelineOptions(); numberOfTextLines = options.getNumberOfRecords(); diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java index 3f08d76750c5..2908c8c2b713 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java @@ -23,7 +23,6 @@ import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount; import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions; -import java.text.ParseException; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.TFRecordIO; @@ -67,7 +66,7 @@ public class TFRecordIOIT { private static String filenamePrefix; - private static Long numberOfTextLines; + private static Integer numberOfTextLines; private static Compression compressionType; @Rule @@ -77,7 +76,7 @@ public class TFRecordIOIT { public TestPipeline readPipeline = TestPipeline.create(); @BeforeClass - public static void setup() throws ParseException { + public static void setup() { IOTestPipelineOptions options = readTestPipelineOptions(); numberOfTextLines = options.getNumberOfRecords(); diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml index e6bb357a0894..73fbc52590a2 100644 --- a/sdks/java/io/jdbc/pom.xml +++ b/sdks/java/io/jdbc/pom.xml @@ -122,6 +122,7 @@ ${python.interpreter.bin} ${pkbLocation} + -beam_it_timeout=1800 -benchmarks=beam_integration_benchmark -beam_it_profile=io-it -beam_location=${beamRootProjectDir} @@ -204,6 +205,7 @@ ${python.interpreter.bin} ${pkbLocation} + -beam_it_timeout=1800 -benchmarks=beam_integration_benchmark -beam_it_profile=io-it -beam_location=${beamRootProjectDir} diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java index 32d6d9e80b41..941a77543d4e 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java @@ -41,9 +41,6 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.postgresql.ds.PGSimpleDataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * A test of {@link org.apache.beam.sdk.io.jdbc.JdbcIO} on an independent Postgres instance. @@ -56,7 +53,8 @@ * "--postgresUsername=postgres", * "--postgresDatabaseName=myfancydb", * "--postgresPassword=mypass", - * "--postgresSsl=false" ]' + * "--postgresSsl=false", + * "--numberOfRecords=1000" ]' * * *

If you want to run this with a runner besides directrunner, there are profiles for dataflow @@ -65,8 +63,8 @@ */ @RunWith(JUnit4.class) public class JdbcIOIT { - private static final Logger LOG = LoggerFactory.getLogger(JdbcIOIT.class); - public static final int EXPECTED_ROW_COUNT = 1000; + + private static int numberOfRows; private static PGSimpleDataSource dataSource; private static String tableName; @@ -81,14 +79,14 @@ public static void setup() throws SQLException, ParseException { IOTestPipelineOptions options = TestPipeline.testingPipelineOptions() .as(IOTestPipelineOptions.class); + numberOfRows = options.getNumberOfRecords(); dataSource = getDataSource(options); tableName = JdbcTestHelper.getTableName("IT"); JdbcTestHelper.createDataTable(dataSource, tableName); } - private static PGSimpleDataSource getDataSource(IOTestPipelineOptions options) - throws SQLException { + private static PGSimpleDataSource getDataSource(IOTestPipelineOptions options) { PGSimpleDataSource dataSource = new PGSimpleDataSource(); dataSource.setDatabaseName(options.getPostgresDatabaseName()); @@ -124,7 +122,7 @@ public void testWriteThenRead() { * the database.) */ private void runWrite() { - pipelineWrite.apply(GenerateSequence.from(0).to((long) EXPECTED_ROW_COUNT)) + pipelineWrite.apply(GenerateSequence.from(0).to(numberOfRows)) .apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())) .apply(JdbcIO.write() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource)) @@ -162,13 +160,13 @@ private void runRead() { PAssert.thatSingleton( namesAndIds.apply("Count All", Count.globally())) - .isEqualTo((long) EXPECTED_ROW_COUNT); + .isEqualTo((long) numberOfRows); PCollection consolidatedHashcode = namesAndIds .apply(ParDo.of(new TestRow.SelectNameFn())) .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults()); PAssert.that(consolidatedHashcode) - .containsInAnyOrder(TestRow.getExpectedHashForRowCount(EXPECTED_ROW_COUNT)); + .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows)); PCollection> frontOfList = namesAndIds.apply(Top.smallest(500)); @@ -178,8 +176,7 @@ private void runRead() { PCollection> backOfList = namesAndIds.apply(Top.largest(500)); Iterable expectedBackOfList = - TestRow.getExpectedValues(EXPECTED_ROW_COUNT - 500, - EXPECTED_ROW_COUNT); + TestRow.getExpectedValues(numberOfRows - 500, numberOfRows); PAssert.thatSingletonIterable(backOfList).containsInAnyOrder(expectedBackOfList); pipelineRead.run().waitUntilFinish(); From 205aee6b9b03c3b34bab9a36586b0dd5c4e34eb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Gajowy?= Date: Thu, 11 Jan 2018 17:55:53 +0100 Subject: [PATCH 2/4] fixup! [BEAM-3456] Enable jenkins and large scale scenario in JDBC --- .test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy b/.test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy index 10d2db5b2957..5890d0ff86d0 100644 --- a/.test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy +++ b/.test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy @@ -46,9 +46,9 @@ job('beam_PerformanceTests_JDBC'){ def pipelineArgList = [] pipelineArgs.each({ - key, value -> pipelineArgList.add("--$key=$value") + key, value -> pipelineArgList.add("\"--$key=$value\"") }) - def pipelineArgsJoined = pipelineArgList.join(',') + def pipelineArgsJoined = "[" + pipelineArgList.join(',') + "]" def argMap = [ kubeconfig: '/home/jenkins/.kube/config', From 3296bb6d6d711df7a65ee27d43b4d8769778ef89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Gajowy?= Date: Fri, 12 Jan 2018 13:17:17 +0100 Subject: [PATCH 3/4] [BEAM-3456] Allow only beam1 jenkins executor which has kubernetes on it --- .../jenkins/job_beam_PerformanceTests_JDBC.groovy | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy b/.test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy index 5890d0ff86d0..20b7507872af 100644 --- a/.test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy +++ b/.test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy @@ -65,6 +65,18 @@ job('beam_PerformanceTests_JDBC'){ bigquery_table: 'beam_performance.JdbcIOIT_pkb_results' ] + // Allow the test to only run on nodes with kubernetes installed. + // TODO(INFRA-14819): remove when kubernetes is installed on all Jenkins workers + parameters { + nodeParam('TEST_HOST') { + description('select beam1 test host - only this one has kubernetes installed') + defaultNodes(['beam1']) + allowedNodes(['beam1']) + trigger('multiSelectionDisallowed') + eligibility('IgnoreOfflineNodeEligibility') + } + } + common_job_properties.buildPerformanceTest(delegate, argMap) } From eaef1ed2bac0e4c50a4f19edf4fc1174f9118aad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Gajowy?= Date: Wed, 17 Jan 2018 14:15:39 +0100 Subject: [PATCH 4/4] [BEAM-3456] Revert Jdbc Performance test job enabling The kubernetes infrastructure that is needed for the Jenkins job to run is not available for now. We should add it once the infrastructure is there. --- .../job_beam_PerformanceTests_JDBC.groovy | 56 ++++++------------- 1 file changed, 17 insertions(+), 39 deletions(-) diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy b/.test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy index 20b7507872af..ef73a261b0c4 100644 --- a/.test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy +++ b/.test-infra/jenkins/job_beam_PerformanceTests_JDBC.groovy @@ -23,11 +23,6 @@ job('beam_PerformanceTests_JDBC'){ // Set default Beam job properties. common_job_properties.setTopLevelMainJobProperties(delegate) - common_job_properties.enablePhraseTriggeringFromPullRequest( - delegate, - 'Java JdbcIO Performance Test', - 'Run Java JdbcIO Performance Test') - // Run job in postcommit every 6 hours, don't trigger every push, and // don't email individual committers. common_job_properties.setPostCommit( @@ -38,48 +33,31 @@ job('beam_PerformanceTests_JDBC'){ false) def pipelineArgs = [ - project: 'apache-beam-testing', - tempRoot: 'gs://temp-storage-for-perf-tests', - postgresPort: '5432', - numberOfRecords: '5000000' + tempRoot: 'gs://temp-storage-for-end-to-end-tests', + project: 'apache-beam-testing', + postgresServerName: '10.36.0.11', + postgresUsername: 'postgres', + postgresDatabaseName: 'postgres', + postgresPassword: 'uuinkks', + postgresSsl: 'false' ] - def pipelineArgList = [] pipelineArgs.each({ - key, value -> pipelineArgList.add("\"--$key=$value\"") + key, value -> pipelineArgList.add("--$key=$value") }) - def pipelineArgsJoined = "[" + pipelineArgList.join(',') + "]" + def pipelineArgsJoined = pipelineArgList.join(',') def argMap = [ - kubeconfig: '/home/jenkins/.kube/config', - beam_it_timeout: '1200', - benchmarks: 'beam_integration_benchmark', - beam_it_profile: 'io-it', - beam_prebuilt: 'true', - beam_sdk: 'java', - beam_it_module: 'sdks/java/io/jdbc', - beam_it_class: 'org.apache.beam.sdk.io.jdbc.JdbcIOIT', - beam_it_options: pipelineArgsJoined, - beam_kubernetes_scripts: makePathAbsolute('.test-infra/kubernetes/postgres/postgres.yml'), - beam_options_config_file: makePathAbsolute('.test-infra/kubernetes/postgres/pkb-config.yml'), - bigquery_table: 'beam_performance.JdbcIOIT_pkb_results' + benchmarks: 'beam_integration_benchmark', + beam_it_module: 'sdks/java/io/jdbc', + beam_it_args: pipelineArgsJoined, + beam_it_class: 'org.apache.beam.sdk.io.jdbc.JdbcIOIT', + // Profile is located in $BEAM_ROOT/sdks/java/io/pom.xml. + beam_it_profile: 'io-it' ] - // Allow the test to only run on nodes with kubernetes installed. - // TODO(INFRA-14819): remove when kubernetes is installed on all Jenkins workers - parameters { - nodeParam('TEST_HOST') { - description('select beam1 test host - only this one has kubernetes installed') - defaultNodes(['beam1']) - allowedNodes(['beam1']) - trigger('multiSelectionDisallowed') - eligibility('IgnoreOfflineNodeEligibility') - } - } - common_job_properties.buildPerformanceTest(delegate, argMap) -} -static def makePathAbsolute(String path) { - return '"$WORKSPACE/' + path + '"' + // [BEAM-2141] Perf tests do not pass. + disabled() }