From 138b0e00d1ea86cd27700083ee657e8c5abae4a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Gajowy?= Date: Fri, 29 Dec 2017 15:48:24 +0100 Subject: [PATCH 1/4] [BEAM-3217] add HadoopInputFormatIO integration test using DBInputFormat --- sdks/java/io/common/pom.xml | 5 + .../sdk/io/common/DatabaseTestHelper.java | 78 ++++++++++ .../apache/beam/sdk/io/common/TestRow.java | 4 +- sdks/java/io/hadoop/input-format/pom.xml | 139 ++++++++++++++++++ .../inputformat/HadoopInputFormatIOIT.java | 133 +++++++++++++++++ .../hadoop/inputformat/TestRowDBWritable.java | 83 +++++++++++ sdks/java/io/jdbc/pom.xml | 1 - .../org/apache/beam/sdk/io/jdbc/JdbcIOIT.java | 27 +--- .../apache/beam/sdk/io/jdbc/JdbcIOTest.java | 13 +- .../beam/sdk/io/jdbc/JdbcTestHelper.java | 35 +---- sdks/java/io/pom.xml | 11 ++ 11 files changed, 468 insertions(+), 61 deletions(-) create mode 100644 sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java create mode 100644 sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java create mode 100644 sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java diff --git a/sdks/java/io/common/pom.xml b/sdks/java/io/common/pom.xml index eb79091b753e..bd143d4dbb14 100644 --- a/sdks/java/io/common/pom.xml +++ b/sdks/java/io/common/pom.xml @@ -48,5 +48,10 @@ junit test + + org.postgresql + postgresql + test + diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java new file mode 100644 index 000000000000..7a08f465424d --- /dev/null +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.common; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.SimpleDateFormat; +import java.util.Date; +import javax.sql.DataSource; +import org.postgresql.ds.PGSimpleDataSource; + +/** + * This class contains helper methods to ease database usage in tests. + */ +public class DatabaseTestHelper { + + public static PGSimpleDataSource getPostgresDataSource(IOTestPipelineOptions options) { + PGSimpleDataSource dataSource = new PGSimpleDataSource(); + dataSource.setDatabaseName(options.getPostgresDatabaseName()); + dataSource.setServerName(options.getPostgresServerName()); + dataSource.setPortNumber(options.getPostgresPort()); + dataSource.setUser(options.getPostgresUsername()); + dataSource.setPassword(options.getPostgresPassword()); + dataSource.setSsl(options.getPostgresSsl()); + return dataSource; + } + + public static void createDataTable(DataSource dataSource, String tableName) + throws SQLException { + try (Connection connection = dataSource.getConnection()) { + try (Statement statement = connection.createStatement()) { + statement.execute( + String.format("create table %s (id INT, name VARCHAR(500))", tableName)); + } + } + } + + public static void cleanUpDataTable(DataSource dataSource, String tableName) + throws SQLException { + if (tableName != null) { + try (Connection connection = dataSource.getConnection(); + Statement statement = connection.createStatement()) { + statement.executeUpdate(String.format("drop table %s", tableName)); + } + } + } + + public static String getTestTableName(String testIdentifier) { + SimpleDateFormat formatter = new SimpleDateFormat(); + formatter.applyPattern("yyyy_MM_dd_HH_mm_ss_S"); + return String.format("BEAMTEST_%s_%s", testIdentifier, formatter.format(new Date())); + } + + public static String getPostgresDBUrl(IOTestPipelineOptions options) { + return String.format( + "jdbc:postgresql://%s:%s/%s", + options.getPostgresServerName(), + options.getPostgresPort(), + options.getPostgresDatabaseName() + ); + } +} diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java index 5f0a2fb00b21..1e3a45dc32a1 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java @@ -95,7 +95,9 @@ public void processElement(ProcessContext c) { * the name() for the rows generated from seeds in [0, n). */ private static final Map EXPECTED_HASHES = ImmutableMap.of( - 1000, "7d94d63a41164be058a9680002914358" + 1000, "7d94d63a41164be058a9680002914358", + 100_000, "c7cbddb319209e200f1c5eebef8fe960", + 1_000_000, "205893d26b6b9753e3df62d52ad419f6" ); /** diff --git a/sdks/java/io/hadoop/input-format/pom.xml b/sdks/java/io/hadoop/input-format/pom.xml index c698b40f8b31..16e0b76915e6 100644 --- a/sdks/java/io/hadoop/input-format/pom.xml +++ b/sdks/java/io/hadoop/input-format/pom.xml @@ -27,6 +27,124 @@ Apache Beam :: SDKs :: Java :: IO :: Hadoop :: input-format IO to read data from data sources which implement Hadoop Input Format. + + + + dataflow-runner + + + integrationTestRunner + dataflow + + + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + runtime + + + + + + + io-it-suite + + io-it-suite + + + + ${project.parent.parent.parent.parent.parent.basedir} + + + + + org.codehaus.gmaven + groovy-maven-plugin + ${groovy-maven-plugin.version} + + + find-supported-python-for-compile + initialize + + execute + + + ${beamRootProjectDir}/sdks/python/findSupportedPython.groovy + + + + + + + org.codehaus.mojo + exec-maven-plugin + ${maven-exec-plugin.version} + + + verify + + exec + + + + + ${python.interpreter.bin} + + ${pkbLocation} + -benchmarks=beam_integration_benchmark + -beam_it_profile=io-it + -beam_location=${beamRootProjectDir} + -beam_prebuilt=true + -beam_sdk=java + -kubeconfig=${kubeconfig} + -kubectl=${kubectl} + + ${pkbBeamRunnerProfile} + ${pkbBeamRunnerOption} + + -beam_it_module=sdks/java/io/hadoop/input-format + -beam_it_class=${ioItClass} + -beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/postgres/pkb-config-local.yml + -beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/postgres/postgres.yml,${beamRootProjectDir}/.test-infra/kubernetes/postgres/postgres-service-for-local-dev.yml + + -beam_it_options=${integrationTestPipelineOptions} + + -beam_extra_mvn_properties=${pkbExtraProperties} + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${surefire-plugin.version} + + true + + + + + + + com.google.guava @@ -69,10 +187,31 @@ tests test + + org.apache.beam + beam-sdks-java-io-common + test + tests + + + org.apache.beam + beam-sdks-java-io-common + test + org.hamcrest hamcrest-all test + + org.apache.beam + beam-sdks-java-io-jdbc + test + + + org.postgresql + postgresql + test + \ No newline at end of file diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java new file mode 100644 index 000000000000..e8a036bc8b59 --- /dev/null +++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hadoop.inputformat; + +import static org.apache.beam.sdk.io.common.TestRow.DeterministicallyConstructTestRowFn; +import static org.apache.beam.sdk.io.common.TestRow.SelectNameFn; +import static org.apache.beam.sdk.io.hadoop.inputformat.TestRowDBWritable.PrepareStatementFromTestRow; + +import java.sql.SQLException; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.common.DatabaseTestHelper; +import org.apache.beam.sdk.io.common.HashingFn; +import org.apache.beam.sdk.io.common.IOTestPipelineOptions; +import org.apache.beam.sdk.io.common.TestRow; +import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; +import org.apache.beam.sdk.io.jdbc.JdbcIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.values.PCollection; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; +import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.postgresql.ds.PGSimpleDataSource; + +/** + * IOIT for {@link org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO}. + */ +public class HadoopInputFormatIOIT { + + private static PGSimpleDataSource dataSource; + private static Long numberOfRows; + private static String tableName; + private static SerializableConfiguration hadoopConfiguration; + + @Rule + public TestPipeline writePipeline = TestPipeline.create(); + + @Rule + public TestPipeline readPipeline = TestPipeline.create(); + + @BeforeClass + public static void setUp() throws SQLException { + PipelineOptionsFactory.register(IOTestPipelineOptions.class); + IOTestPipelineOptions options = TestPipeline.testingPipelineOptions() + .as(IOTestPipelineOptions.class); + + dataSource = DatabaseTestHelper.getPostgresDataSource(options); + numberOfRows = options.getNumberOfRecords(); + tableName = DatabaseTestHelper.getTestTableName("HadoopInputFormatIOIT"); + + DatabaseTestHelper.createDataTable(dataSource, tableName); + setupHadoopConfiguration(options); + } + + private static void setupHadoopConfiguration(IOTestPipelineOptions options) { + Configuration conf = new Configuration(); + DBConfiguration.configureDB( + conf, + "org.postgresql.Driver", + DatabaseTestHelper.getPostgresDBUrl(options), + options.getPostgresUsername(), + options.getPostgresPassword() + ); + conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName); + conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, "id", "name"); + conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, TestRowDBWritable.class, DBWritable.class); + + conf.setClass("key.class", LongWritable.class, Object.class); + conf.setClass("value.class", TestRowDBWritable.class, Object.class); + conf.setClass("mapreduce.job.inputformat.class", DBInputFormat.class, InputFormat.class); + + hadoopConfiguration = new SerializableConfiguration(conf); + } + + @AfterClass + public static void tearDown() throws SQLException { + DatabaseTestHelper.cleanUpDataTable(dataSource, tableName); + } + + @Test + public void writeThenReadUsingDBInputFormat() { + writePipeline.apply("Generate sequence", GenerateSequence.from(0).to(numberOfRows)) + .apply("Produce db rows", ParDo.of(new DeterministicallyConstructTestRowFn())) + .apply(JdbcIO.write() + .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource)) + .withStatement(String.format("insert into %s values(?, ?)", tableName)) + .withPreparedStatementSetter(new PrepareStatementFromTestRow())); + + + writePipeline.run().waitUntilFinish(); + + PCollection consolidatedHashcode = readPipeline + .apply("Read using DBInputFormat", HadoopInputFormatIO + .read() + .withConfiguration(hadoopConfiguration.get())) + .apply("Get values only", Values.create()) + .apply("Values as string", ParDo.of(new SelectNameFn())) + .apply("Calculate hashcode", Combine.globally(new HashingFn())) + .apply(Reshuffle.viaRandomKey()); + + PAssert.thatSingleton(consolidatedHashcode) + .isEqualTo(TestRow.getExpectedHashForRowCount(numberOfRows.intValue())); + + readPipeline.run().waitUntilFinish(); + } +} diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java new file mode 100644 index 000000000000..c9b2639b1922 --- /dev/null +++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hadoop.inputformat; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.io.common.TestRow; +import org.apache.beam.sdk.io.jdbc.JdbcIO; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; + +/** + * A subclass of {@link org.apache.beam.sdk.io.common.TestRow} to be used with + * {@link org.apache.hadoop.mapreduce.lib.db.DBInputFormat}. + */ +@DefaultCoder(AvroCoder.class) +public class TestRowDBWritable extends TestRow implements DBWritable, Writable { + + private Integer id; + private String name; + + @Override public Integer id() { + return id; + } + + public String name() { + return name; + } + + @Override + public void write(PreparedStatement statement) throws SQLException { + statement.setInt(1, id); + statement.setString(2, name); + } + + @Override + public void readFields(ResultSet resultSet) throws SQLException { + id = resultSet.getInt(1); + name = resultSet.getString(2); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(id); + out.writeChars(name); + } + + @Override + public void readFields(DataInput in) throws IOException { + id = in.readInt(); + name = in.readUTF(); + } + + static class PrepareStatementFromTestRow implements JdbcIO.PreparedStatementSetter { + @Override + public void setParameters(TestRow element, PreparedStatement statement) + throws SQLException { + statement.setLong(1, element.id()); + statement.setString(2, element.name()); + } + } +} diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml index e6bb357a0894..798fca092696 100644 --- a/sdks/java/io/jdbc/pom.xml +++ b/sdks/java/io/jdbc/pom.xml @@ -326,7 +326,6 @@ org.postgresql postgresql - 9.4.1212.jre7 test diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java index 32d6d9e80b41..ee27a6646a12 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java @@ -17,12 +17,14 @@ */ package org.apache.beam.sdk.io.jdbc; +import static org.apache.beam.sdk.io.common.DatabaseTestHelper.getPostgresDataSource; + import java.sql.SQLException; import java.text.ParseException; import java.util.List; - import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.common.DatabaseTestHelper; import org.apache.beam.sdk.io.common.HashingFn; import org.apache.beam.sdk.io.common.IOTestPipelineOptions; import org.apache.beam.sdk.io.common.TestRow; @@ -45,6 +47,7 @@ import org.slf4j.LoggerFactory; + /** * A test of {@link org.apache.beam.sdk.io.jdbc.JdbcIO} on an independent Postgres instance. * @@ -81,29 +84,15 @@ public static void setup() throws SQLException, ParseException { IOTestPipelineOptions options = TestPipeline.testingPipelineOptions() .as(IOTestPipelineOptions.class); - dataSource = getDataSource(options); - - tableName = JdbcTestHelper.getTableName("IT"); - JdbcTestHelper.createDataTable(dataSource, tableName); - } - - private static PGSimpleDataSource getDataSource(IOTestPipelineOptions options) - throws SQLException { - PGSimpleDataSource dataSource = new PGSimpleDataSource(); - - dataSource.setDatabaseName(options.getPostgresDatabaseName()); - dataSource.setServerName(options.getPostgresServerName()); - dataSource.setPortNumber(options.getPostgresPort()); - dataSource.setUser(options.getPostgresUsername()); - dataSource.setPassword(options.getPostgresPassword()); - dataSource.setSsl(options.getPostgresSsl()); + dataSource = getPostgresDataSource(options); - return dataSource; + tableName = DatabaseTestHelper.getTestTableName("IT"); + DatabaseTestHelper.createDataTable(dataSource, tableName); } @AfterClass public static void tearDown() throws SQLException { - JdbcTestHelper.cleanUpDataTable(dataSource, tableName); + DatabaseTestHelper.cleanUpDataTable(dataSource, tableName); } /** diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java index f35c8b13542f..6f212a3ee85e 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.common.DatabaseTestHelper; import org.apache.beam.sdk.io.common.TestRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -111,16 +112,16 @@ public static void startDatabase() throws Exception { dataSource.setServerName("localhost"); dataSource.setPortNumber(port); - readTableName = JdbcTestHelper.getTableName("UT_READ"); + readTableName = DatabaseTestHelper.getTestTableName("UT_READ"); - JdbcTestHelper.createDataTable(dataSource, readTableName); + DatabaseTestHelper.createDataTable(dataSource, readTableName); addInitialData(dataSource, readTableName); } @AfterClass public static void shutDownDatabase() throws Exception { try { - JdbcTestHelper.cleanUpDataTable(dataSource, readTableName); + DatabaseTestHelper.cleanUpDataTable(dataSource, readTableName); } finally { if (derbyServer != null) { derbyServer.shutdown(); @@ -253,8 +254,8 @@ public void setParameters(PreparedStatement preparedStatement) public void testWrite() throws Exception { final long rowsToAdd = 1000L; - String tableName = JdbcTestHelper.getTableName("UT_WRITE"); - JdbcTestHelper.createDataTable(dataSource, tableName); + String tableName = DatabaseTestHelper.getTestTableName("UT_WRITE"); + DatabaseTestHelper.createDataTable(dataSource, tableName); try { ArrayList> data = new ArrayList<>(); for (int i = 0; i < rowsToAdd; i++) { @@ -290,7 +291,7 @@ public void setParameters( } } } finally { - JdbcTestHelper.cleanUpDataTable(dataSource, tableName); + DatabaseTestHelper.cleanUpDataTable(dataSource, tableName); } } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java index fedae510ea24..18249894a4d6 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java @@ -17,15 +17,9 @@ */ package org.apache.beam.sdk.io.jdbc; -import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; -import javax.sql.DataSource; import org.apache.beam.sdk.io.common.TestRow; /** @@ -33,32 +27,6 @@ * {@link org.apache.beam.sdk.io.jdbc.JdbcIO}. */ class JdbcTestHelper { - static String getTableName(String testIdentifier) throws ParseException { - SimpleDateFormat formatter = new SimpleDateFormat(); - formatter.applyPattern("yyyy_MM_dd_HH_mm_ss_S"); - return String.format("BEAMTEST_%s_%s", testIdentifier, formatter.format(new Date())); - } - - static void createDataTable( - DataSource dataSource, String tableName) - throws SQLException { - try (Connection connection = dataSource.getConnection()) { - try (Statement statement = connection.createStatement()) { - statement.execute( - String.format("create table %s (id INT, name VARCHAR(500))", tableName)); - } - } - } - - static void cleanUpDataTable(DataSource dataSource, String tableName) - throws SQLException { - if (tableName != null) { - try (Connection connection = dataSource.getConnection(); - Statement statement = connection.createStatement()) { - statement.executeUpdate(String.format("drop table %s", tableName)); - } - } - } static class CreateTestRowOfNameAndId implements JdbcIO.RowMapper { @Override @@ -68,8 +36,7 @@ public TestRow mapRow(ResultSet resultSet) throws Exception { } } - static class PrepareStatementFromTestRow - implements JdbcIO.PreparedStatementSetter { + static class PrepareStatementFromTestRow implements JdbcIO.PreparedStatementSetter { @Override public void setParameters(TestRow element, PreparedStatement statement) throws SQLException { diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index 07e1b5cb9fff..ad08c07718b5 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -126,4 +126,15 @@ + + + + + org.postgresql + postgresql + 9.4.1212.jre7 + test + + + From 421aabd7e481518c9171dab86307f00a64857ff5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Gajowy?= Date: Tue, 9 Jan 2018 17:56:56 +0100 Subject: [PATCH 2/4] [BEAM-3217] Post code review changes #1 --- ...ormanceTests_HadoopInputFormatIO_IT.groovy | 74 ++++++++++++ .../sdk/io/common/DatabaseTestHelper.java | 4 +- .../apache/beam/sdk/io/common/TestRow.java | 2 +- sdks/java/io/hadoop/input-format/pom.xml | 109 +++++++++++++++--- .../inputformat/HadoopInputFormatIOIT.java | 34 ++++-- .../org/apache/beam/sdk/io/jdbc/JdbcIOIT.java | 4 +- .../apache/beam/sdk/io/jdbc/JdbcIOTest.java | 8 +- 7 files changed, 198 insertions(+), 37 deletions(-) create mode 100644 .test-infra/jenkins/job_beam_PerformanceTests_HadoopInputFormatIO_IT.groovy diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_HadoopInputFormatIO_IT.groovy b/.test-infra/jenkins/job_beam_PerformanceTests_HadoopInputFormatIO_IT.groovy new file mode 100644 index 000000000000..96b67223b7b5 --- /dev/null +++ b/.test-infra/jenkins/job_beam_PerformanceTests_HadoopInputFormatIO_IT.groovy @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import common_job_properties + +// This job runs the file-based IOs performance tests on PerfKit Benchmarker. +job('beam_PerformanceTests_HadoopInputFormatIO_IT') { + description('Runs PerfKit tests for HadoopInputFormatIO.') + + // Set default Beam job properties. + common_job_properties.setTopLevelMainJobProperties(delegate) + + // Allows triggering this build against pull requests. + common_job_properties.enablePhraseTriggeringFromPullRequest( + delegate, + 'Java HadoopInputFormatIO Performance Test', + 'Run Java HadoopInputFormatIO Performance Test') + + // Run job in postcommit every 6 hours, don't trigger every push, and + // don't email individual committers. + common_job_properties.setPostCommit( + delegate, + '0 */6 * * *', + false, + 'commits@beam.apache.org', + false) + + def pipelineArgs = [ + project: 'apache-beam-testing', + tempRoot: 'gs://temp-storage-for-perf-tests', + postgresPort: '5432', + numberOfRecords: '600000' + ] + def pipelineArgList = [] + pipelineArgs.each({ + key, value -> pipelineArgList.add("\"--$key=$value\"") + }) + def pipelineArgsJoined = "[" + pipelineArgList.join(',') + "]" + + def argMap = [ + beam_it_timeout: '1200', + benchmarks: 'beam_integration_benchmark', + beam_it_profile: 'io-it', + beam_prebuilt: 'true', + beam_sdk: 'java', + beam_it_module: 'sdks/java/io/hadoop/input-format', + beam_it_class: "org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIOIT", + beam_it_options: pipelineArgsJoined, + beam_kubernetes_scripts: makePathAbsolute('.test-infra/kubernetes/postgres/postgres.yml'), + beam_options_config_file: makePathAbsolute('.test-infra/kubernetes/postgres/pkb-config.yml'), + bigquery_table: 'beam_performance.HadoopInputFormatIOIT_pkb_results' + ] + + common_job_properties.buildPerformanceTest(delegate, argMap) +} + +static def makePathAbsolute(String path) { + return '"$WORKSPACE/src/' + path + '"' +} diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java index 7a08f465424d..d69654a25fa9 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java @@ -41,7 +41,7 @@ public static PGSimpleDataSource getPostgresDataSource(IOTestPipelineOptions opt return dataSource; } - public static void createDataTable(DataSource dataSource, String tableName) + public static void createTable(DataSource dataSource, String tableName) throws SQLException { try (Connection connection = dataSource.getConnection()) { try (Statement statement = connection.createStatement()) { @@ -51,7 +51,7 @@ public static void createDataTable(DataSource dataSource, String tableName) } } - public static void cleanUpDataTable(DataSource dataSource, String tableName) + public static void deleteTable(DataSource dataSource, String tableName) throws SQLException { if (tableName != null) { try (Connection connection = dataSource.getConnection(); diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java index 1e3a45dc32a1..015c19ef03e0 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java @@ -97,7 +97,7 @@ public void processElement(ProcessContext c) { private static final Map EXPECTED_HASHES = ImmutableMap.of( 1000, "7d94d63a41164be058a9680002914358", 100_000, "c7cbddb319209e200f1c5eebef8fe960", - 1_000_000, "205893d26b6b9753e3df62d52ad419f6" + 600_000, "e2add2f680de9024e9bc46cd3912545e" ); /** diff --git a/sdks/java/io/hadoop/input-format/pom.xml b/sdks/java/io/hadoop/input-format/pom.xml index 16e0b76915e6..931e11e475fa 100644 --- a/sdks/java/io/hadoop/input-format/pom.xml +++ b/sdks/java/io/hadoop/input-format/pom.xml @@ -47,20 +47,16 @@ + mvn verify -Dio-it-suite -pl sdks/java/io/hadoop/input-format + -DpkbLocation="path-to-pkb.py" \ + -DintegrationTestPipelineOptions='["-tempRoot=gs://bucket/staging"]' + --> io-it-suite @@ -119,19 +115,96 @@ ${pkbBeamRunnerOption} -beam_it_module=sdks/java/io/hadoop/input-format - -beam_it_class=${ioItClass} + -beam_it_class=org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIOIT + -beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/postgres/pkb-config.yml + -beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/postgres/postgres.yml + + -beam_it_options=${integrationTestPipelineOptions} + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${surefire-plugin.version} + + true + + + + + + + + + io-it-suite-local + io-it-suite-local + + + ${project.parent.parent.parent.parent.parent.basedir} + + + + + org.codehaus.gmaven + groovy-maven-plugin + ${groovy-maven-plugin.version} + + + find-supported-python-for-compile + initialize + + execute + + + ${beamRootProjectDir}/sdks/python/findSupportedPython.groovy + + + + + + + org.codehaus.mojo + exec-maven-plugin + ${maven-exec-plugin.version} + + + verify + + exec + + + + + ${python.interpreter.bin} + + ${pkbLocation} + -benchmarks=beam_integration_benchmark + -beam_it_profile=io-it + -beam_location=${beamRootProjectDir} + -beam_prebuilt=true + -beam_sdk=java + -kubeconfig=${kubeconfig} + -kubectl=${kubectl} + + ${pkbBeamRunnerProfile} + ${pkbBeamRunnerOption} + + -beam_it_module=sdks/java/io/hadoop/input-format + -beam_it_class=org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIOIT -beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/postgres/pkb-config-local.yml -beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/postgres/postgres.yml,${beamRootProjectDir}/.test-infra/kubernetes/postgres/postgres-service-for-local-dev.yml -beam_it_options=${integrationTestPipelineOptions} - - -beam_extra_mvn_properties=${pkbExtraProperties} + org.apache.maven.plugins maven-surefire-plugin diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java index e8a036bc8b59..687f48ca0359 100644 --- a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java +++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.io.common.TestRow.DeterministicallyConstructTestRowFn; import static org.apache.beam.sdk.io.common.TestRow.SelectNameFn; +import static org.apache.beam.sdk.io.common.TestRow.getExpectedHashForRowCount; import static org.apache.beam.sdk.io.hadoop.inputformat.TestRowDBWritable.PrepareStatementFromTestRow; import java.sql.SQLException; @@ -49,8 +50,22 @@ import org.junit.Test; import org.postgresql.ds.PGSimpleDataSource; + /** - * IOIT for {@link org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO}. + * A test of {@link org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO} + * on an independent postgres instance. + * + *

This test requires a running instance of Postgres. Pass in connection information using + * PipelineOptions: + *

+ *  mvn -e -Pio-it verify -pl sdks/java/io/hadoop/input-format/ -DintegrationTestPipelineOptions='[
+ *  "--postgresServerName=1.2.3.4",
+ *  "--postgresUsername=postgres",
+ *  "--postgresDatabaseName=myfancydb",
+ *  "--postgresPassword=mypass",
+ *  "--postgresSsl=false",
+ *  "--numberOfRecords=1000" ]'
+ * 
*/ public class HadoopInputFormatIOIT { @@ -75,7 +90,7 @@ public static void setUp() throws SQLException { numberOfRows = options.getNumberOfRecords(); tableName = DatabaseTestHelper.getTestTableName("HadoopInputFormatIOIT"); - DatabaseTestHelper.createDataTable(dataSource, tableName); + DatabaseTestHelper.createTable(dataSource, tableName); setupHadoopConfiguration(options); } @@ -101,32 +116,31 @@ private static void setupHadoopConfiguration(IOTestPipelineOptions options) { @AfterClass public static void tearDown() throws SQLException { - DatabaseTestHelper.cleanUpDataTable(dataSource, tableName); + DatabaseTestHelper.deleteTable(dataSource, tableName); } @Test - public void writeThenReadUsingDBInputFormat() { + public void readUsingHadoopInputFormat() { writePipeline.apply("Generate sequence", GenerateSequence.from(0).to(numberOfRows)) .apply("Produce db rows", ParDo.of(new DeterministicallyConstructTestRowFn())) - .apply(JdbcIO.write() + .apply("Prevent fusion before writing", Reshuffle.viaRandomKey()) + .apply("Write using JDBCIO", JdbcIO.write() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource)) .withStatement(String.format("insert into %s values(?, ?)", tableName)) .withPreparedStatementSetter(new PrepareStatementFromTestRow())); - writePipeline.run().waitUntilFinish(); PCollection consolidatedHashcode = readPipeline - .apply("Read using DBInputFormat", HadoopInputFormatIO + .apply("Read using HadoopInputFormat", HadoopInputFormatIO .read() .withConfiguration(hadoopConfiguration.get())) .apply("Get values only", Values.create()) .apply("Values as string", ParDo.of(new SelectNameFn())) - .apply("Calculate hashcode", Combine.globally(new HashingFn())) - .apply(Reshuffle.viaRandomKey()); + .apply("Calculate hashcode", Combine.globally(new HashingFn())); PAssert.thatSingleton(consolidatedHashcode) - .isEqualTo(TestRow.getExpectedHashForRowCount(numberOfRows.intValue())); + .isEqualTo(getExpectedHashForRowCount(numberOfRows.intValue())); readPipeline.run().waitUntilFinish(); } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java index ee27a6646a12..e22dcd7f9753 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java @@ -87,12 +87,12 @@ public static void setup() throws SQLException, ParseException { dataSource = getPostgresDataSource(options); tableName = DatabaseTestHelper.getTestTableName("IT"); - DatabaseTestHelper.createDataTable(dataSource, tableName); + DatabaseTestHelper.createTable(dataSource, tableName); } @AfterClass public static void tearDown() throws SQLException { - DatabaseTestHelper.cleanUpDataTable(dataSource, tableName); + DatabaseTestHelper.deleteTable(dataSource, tableName); } /** diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java index 6f212a3ee85e..4871f20a58ef 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java @@ -114,14 +114,14 @@ public static void startDatabase() throws Exception { readTableName = DatabaseTestHelper.getTestTableName("UT_READ"); - DatabaseTestHelper.createDataTable(dataSource, readTableName); + DatabaseTestHelper.createTable(dataSource, readTableName); addInitialData(dataSource, readTableName); } @AfterClass public static void shutDownDatabase() throws Exception { try { - DatabaseTestHelper.cleanUpDataTable(dataSource, readTableName); + DatabaseTestHelper.deleteTable(dataSource, readTableName); } finally { if (derbyServer != null) { derbyServer.shutdown(); @@ -255,7 +255,7 @@ public void testWrite() throws Exception { final long rowsToAdd = 1000L; String tableName = DatabaseTestHelper.getTestTableName("UT_WRITE"); - DatabaseTestHelper.createDataTable(dataSource, tableName); + DatabaseTestHelper.createTable(dataSource, tableName); try { ArrayList> data = new ArrayList<>(); for (int i = 0; i < rowsToAdd; i++) { @@ -291,7 +291,7 @@ public void setParameters( } } } finally { - DatabaseTestHelper.cleanUpDataTable(dataSource, tableName); + DatabaseTestHelper.deleteTable(dataSource, tableName); } } From 67cff1461d649322366378f4370d25c773a65466 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Gajowy?= Date: Wed, 10 Jan 2018 11:11:52 +0100 Subject: [PATCH 3/4] [BEAM-3217] Fix jenkins job configuration --- .../job_beam_PerformanceTests_HadoopInputFormatIO_IT.groovy | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_HadoopInputFormatIO_IT.groovy b/.test-infra/jenkins/job_beam_PerformanceTests_HadoopInputFormatIO_IT.groovy index 96b67223b7b5..fb4420461f67 100644 --- a/.test-infra/jenkins/job_beam_PerformanceTests_HadoopInputFormatIO_IT.groovy +++ b/.test-infra/jenkins/job_beam_PerformanceTests_HadoopInputFormatIO_IT.groovy @@ -53,6 +53,7 @@ job('beam_PerformanceTests_HadoopInputFormatIO_IT') { def pipelineArgsJoined = "[" + pipelineArgList.join(',') + "]" def argMap = [ + kubeconfig: '/home/jenkins/.kube/config', beam_it_timeout: '1200', benchmarks: 'beam_integration_benchmark', beam_it_profile: 'io-it', @@ -70,5 +71,5 @@ job('beam_PerformanceTests_HadoopInputFormatIO_IT') { } static def makePathAbsolute(String path) { - return '"$WORKSPACE/src/' + path + '"' + return '"$WORKSPACE/' + path + '"' } From cc4918862d5061dbe955ff1be4e22e2b40fcae94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Gajowy?= Date: Wed, 17 Jan 2018 14:35:19 +0100 Subject: [PATCH 4/4] [BEAM-3217] Remove Jenkins job for the test The kubernetes infrastructure that is needed for the jenkins job to run is not available for now. We should add it once the infrastructure is there. --- ...ormanceTests_HadoopInputFormatIO_IT.groovy | 75 ------------------- 1 file changed, 75 deletions(-) delete mode 100644 .test-infra/jenkins/job_beam_PerformanceTests_HadoopInputFormatIO_IT.groovy diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_HadoopInputFormatIO_IT.groovy b/.test-infra/jenkins/job_beam_PerformanceTests_HadoopInputFormatIO_IT.groovy deleted file mode 100644 index fb4420461f67..000000000000 --- a/.test-infra/jenkins/job_beam_PerformanceTests_HadoopInputFormatIO_IT.groovy +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import common_job_properties - -// This job runs the file-based IOs performance tests on PerfKit Benchmarker. -job('beam_PerformanceTests_HadoopInputFormatIO_IT') { - description('Runs PerfKit tests for HadoopInputFormatIO.') - - // Set default Beam job properties. - common_job_properties.setTopLevelMainJobProperties(delegate) - - // Allows triggering this build against pull requests. - common_job_properties.enablePhraseTriggeringFromPullRequest( - delegate, - 'Java HadoopInputFormatIO Performance Test', - 'Run Java HadoopInputFormatIO Performance Test') - - // Run job in postcommit every 6 hours, don't trigger every push, and - // don't email individual committers. - common_job_properties.setPostCommit( - delegate, - '0 */6 * * *', - false, - 'commits@beam.apache.org', - false) - - def pipelineArgs = [ - project: 'apache-beam-testing', - tempRoot: 'gs://temp-storage-for-perf-tests', - postgresPort: '5432', - numberOfRecords: '600000' - ] - def pipelineArgList = [] - pipelineArgs.each({ - key, value -> pipelineArgList.add("\"--$key=$value\"") - }) - def pipelineArgsJoined = "[" + pipelineArgList.join(',') + "]" - - def argMap = [ - kubeconfig: '/home/jenkins/.kube/config', - beam_it_timeout: '1200', - benchmarks: 'beam_integration_benchmark', - beam_it_profile: 'io-it', - beam_prebuilt: 'true', - beam_sdk: 'java', - beam_it_module: 'sdks/java/io/hadoop/input-format', - beam_it_class: "org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIOIT", - beam_it_options: pipelineArgsJoined, - beam_kubernetes_scripts: makePathAbsolute('.test-infra/kubernetes/postgres/postgres.yml'), - beam_options_config_file: makePathAbsolute('.test-infra/kubernetes/postgres/pkb-config.yml'), - bigquery_table: 'beam_performance.HadoopInputFormatIOIT_pkb_results' - ] - - common_job_properties.buildPerformanceTest(delegate, argMap) -} - -static def makePathAbsolute(String path) { - return '"$WORKSPACE/' + path + '"' -}