EXPECTED_HASHES = ImmutableMap.of(
1000, "7d94d63a41164be058a9680002914358",
100_000, "c7cbddb319209e200f1c5eebef8fe960",
+ 600_000, "e2add2f680de9024e9bc46cd3912545e",
5_000_000, "c44f8a5648cd9207c9c6f77395a998dc"
);
diff --git a/sdks/java/io/hadoop/input-format/pom.xml b/sdks/java/io/hadoop/input-format/pom.xml
index c698b40f8b31..931e11e475fa 100644
--- a/sdks/java/io/hadoop/input-format/pom.xml
+++ b/sdks/java/io/hadoop/input-format/pom.xml
@@ -27,6 +27,197 @@
Apache Beam :: SDKs :: Java :: IO :: Hadoop :: input-format
IO to read data from data sources which implement Hadoop Input Format.
+
+
+
+ dataflow-runner
+
+
+ integrationTestRunner
+ dataflow
+
+
+
+
+ org.apache.beam
+ beam-runners-google-cloud-dataflow-java
+ runtime
+
+
+
+
+
+
+ io-it-suite
+
+ io-it-suite
+
+
+
+ ${project.parent.parent.parent.parent.parent.basedir}
+
+
+
+
+ org.codehaus.gmaven
+ groovy-maven-plugin
+ ${groovy-maven-plugin.version}
+
+
+ find-supported-python-for-compile
+ initialize
+
+ execute
+
+
+ ${beamRootProjectDir}/sdks/python/findSupportedPython.groovy
+
+
+
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ ${maven-exec-plugin.version}
+
+
+ verify
+
+ exec
+
+
+
+
+ ${python.interpreter.bin}
+
+ ${pkbLocation}
+ -benchmarks=beam_integration_benchmark
+ -beam_it_profile=io-it
+ -beam_location=${beamRootProjectDir}
+ -beam_prebuilt=true
+ -beam_sdk=java
+ -kubeconfig=${kubeconfig}
+ -kubectl=${kubectl}
+
+ ${pkbBeamRunnerProfile}
+ ${pkbBeamRunnerOption}
+
+ -beam_it_module=sdks/java/io/hadoop/input-format
+ -beam_it_class=org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIOIT
+ -beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/postgres/pkb-config.yml
+ -beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/postgres/postgres.yml
+
+ -beam_it_options=${integrationTestPipelineOptions}
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ ${surefire-plugin.version}
+
+ true
+
+
+
+
+
+
+
+
+ io-it-suite-local
+ io-it-suite-local
+
+
+ ${project.parent.parent.parent.parent.parent.basedir}
+
+
+
+
+ org.codehaus.gmaven
+ groovy-maven-plugin
+ ${groovy-maven-plugin.version}
+
+
+ find-supported-python-for-compile
+ initialize
+
+ execute
+
+
+ ${beamRootProjectDir}/sdks/python/findSupportedPython.groovy
+
+
+
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ ${maven-exec-plugin.version}
+
+
+ verify
+
+ exec
+
+
+
+
+ ${python.interpreter.bin}
+
+ ${pkbLocation}
+ -benchmarks=beam_integration_benchmark
+ -beam_it_profile=io-it
+ -beam_location=${beamRootProjectDir}
+ -beam_prebuilt=true
+ -beam_sdk=java
+ -kubeconfig=${kubeconfig}
+ -kubectl=${kubectl}
+
+ ${pkbBeamRunnerProfile}
+ ${pkbBeamRunnerOption}
+
+ -beam_it_module=sdks/java/io/hadoop/input-format
+ -beam_it_class=org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIOIT
+ -beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/postgres/pkb-config-local.yml
+ -beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/postgres/postgres.yml,${beamRootProjectDir}/.test-infra/kubernetes/postgres/postgres-service-for-local-dev.yml
+
+ -beam_it_options=${integrationTestPipelineOptions}
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ ${surefire-plugin.version}
+
+ true
+
+
+
+
+
+
+
com.google.guava
@@ -69,10 +260,31 @@
tests
test
+
+ org.apache.beam
+ beam-sdks-java-io-common
+ test
+ tests
+
+
+ org.apache.beam
+ beam-sdks-java-io-common
+ test
+
org.hamcrest
hamcrest-all
test
+
+ org.apache.beam
+ beam-sdks-java-io-jdbc
+ test
+
+
+ org.postgresql
+ postgresql
+ test
+
\ No newline at end of file
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
new file mode 100644
index 000000000000..397aa3a1e609
--- /dev/null
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.inputformat;
+
+import static org.apache.beam.sdk.io.common.TestRow.DeterministicallyConstructTestRowFn;
+import static org.apache.beam.sdk.io.common.TestRow.SelectNameFn;
+import static org.apache.beam.sdk.io.common.TestRow.getExpectedHashForRowCount;
+import static org.apache.beam.sdk.io.hadoop.inputformat.TestRowDBWritable.PrepareStatementFromTestRow;
+
+import java.sql.SQLException;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.postgresql.ds.PGSimpleDataSource;
+
+
+/**
+ * A test of {@link org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO}
+ * on an independent postgres instance.
+ *
+ * This test requires a running instance of Postgres. Pass in connection information using
+ * PipelineOptions:
+ *
+ * mvn -e -Pio-it verify -pl sdks/java/io/hadoop/input-format/ -DintegrationTestPipelineOptions='[
+ * "--postgresServerName=1.2.3.4",
+ * "--postgresUsername=postgres",
+ * "--postgresDatabaseName=myfancydb",
+ * "--postgresPassword=mypass",
+ * "--postgresSsl=false",
+ * "--numberOfRecords=1000" ]'
+ *
+ */
+public class HadoopInputFormatIOIT {
+
+ private static PGSimpleDataSource dataSource;
+ private static Integer numberOfRows;
+ private static String tableName;
+ private static SerializableConfiguration hadoopConfiguration;
+
+ @Rule
+ public TestPipeline writePipeline = TestPipeline.create();
+
+ @Rule
+ public TestPipeline readPipeline = TestPipeline.create();
+
+ @BeforeClass
+ public static void setUp() throws SQLException {
+ PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+ IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
+ .as(IOTestPipelineOptions.class);
+
+ dataSource = DatabaseTestHelper.getPostgresDataSource(options);
+ numberOfRows = options.getNumberOfRecords();
+ tableName = DatabaseTestHelper.getTestTableName("HadoopInputFormatIOIT");
+
+ DatabaseTestHelper.createTable(dataSource, tableName);
+ setupHadoopConfiguration(options);
+ }
+
+ private static void setupHadoopConfiguration(IOTestPipelineOptions options) {
+ Configuration conf = new Configuration();
+ DBConfiguration.configureDB(
+ conf,
+ "org.postgresql.Driver",
+ DatabaseTestHelper.getPostgresDBUrl(options),
+ options.getPostgresUsername(),
+ options.getPostgresPassword()
+ );
+ conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
+ conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, "id", "name");
+ conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, TestRowDBWritable.class, DBWritable.class);
+
+ conf.setClass("key.class", LongWritable.class, Object.class);
+ conf.setClass("value.class", TestRowDBWritable.class, Object.class);
+ conf.setClass("mapreduce.job.inputformat.class", DBInputFormat.class, InputFormat.class);
+
+ hadoopConfiguration = new SerializableConfiguration(conf);
+ }
+
+ @AfterClass
+ public static void tearDown() throws SQLException {
+ DatabaseTestHelper.deleteTable(dataSource, tableName);
+ }
+
+ @Test
+ public void readUsingHadoopInputFormat() {
+ writePipeline.apply("Generate sequence", GenerateSequence.from(0).to(numberOfRows))
+ .apply("Produce db rows", ParDo.of(new DeterministicallyConstructTestRowFn()))
+ .apply("Prevent fusion before writing", Reshuffle.viaRandomKey())
+ .apply("Write using JDBCIO", JdbcIO.write()
+ .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+ .withStatement(String.format("insert into %s values(?, ?)", tableName))
+ .withPreparedStatementSetter(new PrepareStatementFromTestRow()));
+
+ writePipeline.run().waitUntilFinish();
+
+ PCollection consolidatedHashcode = readPipeline
+ .apply("Read using HadoopInputFormat", HadoopInputFormatIO
+ .read()
+ .withConfiguration(hadoopConfiguration.get()))
+ .apply("Get values only", Values.create())
+ .apply("Values as string", ParDo.of(new SelectNameFn()))
+ .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+ PAssert.thatSingleton(consolidatedHashcode)
+ .isEqualTo(getExpectedHashForRowCount(numberOfRows));
+
+ readPipeline.run().waitUntilFinish();
+ }
+}
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
new file mode 100644
index 000000000000..c9b2639b1922
--- /dev/null
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.inputformat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+/**
+ * A subclass of {@link org.apache.beam.sdk.io.common.TestRow} to be used with
+ * {@link org.apache.hadoop.mapreduce.lib.db.DBInputFormat}.
+ */
+@DefaultCoder(AvroCoder.class)
+public class TestRowDBWritable extends TestRow implements DBWritable, Writable {
+
+ private Integer id;
+ private String name;
+
+ @Override public Integer id() {
+ return id;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public void write(PreparedStatement statement) throws SQLException {
+ statement.setInt(1, id);
+ statement.setString(2, name);
+ }
+
+ @Override
+ public void readFields(ResultSet resultSet) throws SQLException {
+ id = resultSet.getInt(1);
+ name = resultSet.getString(2);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(id);
+ out.writeChars(name);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = in.readInt();
+ name = in.readUTF();
+ }
+
+ static class PrepareStatementFromTestRow implements JdbcIO.PreparedStatementSetter {
+ @Override
+ public void setParameters(TestRow element, PreparedStatement statement)
+ throws SQLException {
+ statement.setLong(1, element.id());
+ statement.setString(2, element.name());
+ }
+ }
+}
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index 73fbc52590a2..6264e9e411f5 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -328,7 +328,6 @@
org.postgresql
postgresql
- 9.4.1212.jre7
test
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index 941a77543d4e..ed169c722540 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -18,11 +18,10 @@
package org.apache.beam.sdk.io.jdbc;
import java.sql.SQLException;
-import java.text.ParseException;
import java.util.List;
-
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.io.common.TestRow;
@@ -74,34 +73,20 @@ public class JdbcIOIT {
public TestPipeline pipelineRead = TestPipeline.create();
@BeforeClass
- public static void setup() throws SQLException, ParseException {
+ public static void setup() throws SQLException {
PipelineOptionsFactory.register(IOTestPipelineOptions.class);
IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
.as(IOTestPipelineOptions.class);
numberOfRows = options.getNumberOfRecords();
- dataSource = getDataSource(options);
-
- tableName = JdbcTestHelper.getTableName("IT");
- JdbcTestHelper.createDataTable(dataSource, tableName);
- }
-
- private static PGSimpleDataSource getDataSource(IOTestPipelineOptions options) {
- PGSimpleDataSource dataSource = new PGSimpleDataSource();
-
- dataSource.setDatabaseName(options.getPostgresDatabaseName());
- dataSource.setServerName(options.getPostgresServerName());
- dataSource.setPortNumber(options.getPostgresPort());
- dataSource.setUser(options.getPostgresUsername());
- dataSource.setPassword(options.getPostgresPassword());
- dataSource.setSsl(options.getPostgresSsl());
-
- return dataSource;
+ dataSource = DatabaseTestHelper.getPostgresDataSource(options);
+ tableName = DatabaseTestHelper.getTestTableName("IT");
+ DatabaseTestHelper.createTable(dataSource, tableName);
}
@AfterClass
public static void tearDown() throws SQLException {
- JdbcTestHelper.cleanUpDataTable(dataSource, tableName);
+ DatabaseTestHelper.deleteTable(dataSource, tableName);
}
/**
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index f35c8b13542f..4871f20a58ef 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -37,6 +37,7 @@
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -111,16 +112,16 @@ public static void startDatabase() throws Exception {
dataSource.setServerName("localhost");
dataSource.setPortNumber(port);
- readTableName = JdbcTestHelper.getTableName("UT_READ");
+ readTableName = DatabaseTestHelper.getTestTableName("UT_READ");
- JdbcTestHelper.createDataTable(dataSource, readTableName);
+ DatabaseTestHelper.createTable(dataSource, readTableName);
addInitialData(dataSource, readTableName);
}
@AfterClass
public static void shutDownDatabase() throws Exception {
try {
- JdbcTestHelper.cleanUpDataTable(dataSource, readTableName);
+ DatabaseTestHelper.deleteTable(dataSource, readTableName);
} finally {
if (derbyServer != null) {
derbyServer.shutdown();
@@ -253,8 +254,8 @@ public void setParameters(PreparedStatement preparedStatement)
public void testWrite() throws Exception {
final long rowsToAdd = 1000L;
- String tableName = JdbcTestHelper.getTableName("UT_WRITE");
- JdbcTestHelper.createDataTable(dataSource, tableName);
+ String tableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+ DatabaseTestHelper.createTable(dataSource, tableName);
try {
ArrayList> data = new ArrayList<>();
for (int i = 0; i < rowsToAdd; i++) {
@@ -290,7 +291,7 @@ public void setParameters(
}
}
} finally {
- JdbcTestHelper.cleanUpDataTable(dataSource, tableName);
+ DatabaseTestHelper.deleteTable(dataSource, tableName);
}
}
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
index fedae510ea24..18249894a4d6 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
@@ -17,15 +17,9 @@
*/
package org.apache.beam.sdk.io.jdbc;
-import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import javax.sql.DataSource;
import org.apache.beam.sdk.io.common.TestRow;
/**
@@ -33,32 +27,6 @@
* {@link org.apache.beam.sdk.io.jdbc.JdbcIO}.
*/
class JdbcTestHelper {
- static String getTableName(String testIdentifier) throws ParseException {
- SimpleDateFormat formatter = new SimpleDateFormat();
- formatter.applyPattern("yyyy_MM_dd_HH_mm_ss_S");
- return String.format("BEAMTEST_%s_%s", testIdentifier, formatter.format(new Date()));
- }
-
- static void createDataTable(
- DataSource dataSource, String tableName)
- throws SQLException {
- try (Connection connection = dataSource.getConnection()) {
- try (Statement statement = connection.createStatement()) {
- statement.execute(
- String.format("create table %s (id INT, name VARCHAR(500))", tableName));
- }
- }
- }
-
- static void cleanUpDataTable(DataSource dataSource, String tableName)
- throws SQLException {
- if (tableName != null) {
- try (Connection connection = dataSource.getConnection();
- Statement statement = connection.createStatement()) {
- statement.executeUpdate(String.format("drop table %s", tableName));
- }
- }
- }
static class CreateTestRowOfNameAndId implements JdbcIO.RowMapper {
@Override
@@ -68,8 +36,7 @@ public TestRow mapRow(ResultSet resultSet) throws Exception {
}
}
- static class PrepareStatementFromTestRow
- implements JdbcIO.PreparedStatementSetter {
+ static class PrepareStatementFromTestRow implements JdbcIO.PreparedStatementSetter {
@Override
public void setParameters(TestRow element, PreparedStatement statement)
throws SQLException {
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 0710df05d893..d643775d780a 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -127,4 +127,15 @@
+
+
+
+
+ org.postgresql
+ postgresql
+ 9.4.1212.jre7
+ test
+
+
+