diff --git a/.test-infra/jenkins/job_PerformanceTests_CdapIO.groovy b/.test-infra/jenkins/job_PerformanceTests_CdapIO.groovy
new file mode 100644
index 000000000000..6724c7d9306e
--- /dev/null
+++ b/.test-infra/jenkins/job_PerformanceTests_CdapIO.groovy
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as common
+import Kubernetes
+import InfluxDBCredentialsHelper
+
+String jobName = "beam_PerformanceTests_Cdap"
+
+job(jobName) {
+ common.setTopLevelMainJobProperties(delegate)
+ common.setAutoJob(delegate, 'H H/6 * * *')
+ common.enablePhraseTriggeringFromPullRequest(
+ delegate,
+ 'Java CdapIO Performance Test',
+ 'Run Java CdapIO Performance Test')
+ InfluxDBCredentialsHelper.useCredentials(delegate)
+
+ String namespace = common.getKubernetesNamespace(jobName)
+ String kubeconfig = common.getKubeconfigLocationForNamespace(namespace)
+ Kubernetes k8s = Kubernetes.create(delegate, kubeconfig, namespace)
+
+ k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/postgres/postgres-service-for-local-dev.yml"))
+ String postgresHostName = "LOAD_BALANCER_IP"
+ k8s.loadBalancerIP("postgres-for-dev", postgresHostName)
+
+ Map pipelineOptions = [
+ tempRoot : 'gs://temp-storage-for-perf-tests',
+ project : 'apache-beam-testing',
+ runner : 'DataflowRunner',
+ numberOfRecords : '600000',
+ bigQueryDataset : 'beam_performance',
+ bigQueryTable : 'cdapioit_results',
+ influxMeasurement : 'cdapioit_results',
+ influxDatabase : InfluxDBCredentialsHelper.InfluxDBDatabaseName,
+ influxHost : InfluxDBCredentialsHelper.InfluxDBHostUrl,
+ postgresUsername : 'postgres',
+ postgresPassword : 'uuinkks',
+ postgresDatabaseName : 'postgres',
+ postgresServerName : "\$${postgresHostName}",
+ postgresSsl : false,
+ postgresPort : '5432',
+ numWorkers : '5',
+ autoscalingAlgorithm : 'NONE'
+ ]
+
+ steps {
+ gradle {
+ rootBuildScriptDir(common.checkoutDir)
+ common.setGradleSwitches(delegate)
+ switches("--info")
+ switches("-DintegrationTestPipelineOptions=\'${common.joinPipelineOptions(pipelineOptions)}\'")
+ switches("-DintegrationTestRunner=dataflow")
+ tasks(":sdks:java:io:cdap:integrationTest --tests org.apache.beam.sdk.io.cdap.CdapIOIT")
+ }
+ }
+}
diff --git a/sdks/java/io/cdap/build.gradle b/sdks/java/io/cdap/build.gradle
index a2781cf5b016..1bcc0ece146b 100644
--- a/sdks/java/io/cdap/build.gradle
+++ b/sdks/java/io/cdap/build.gradle
@@ -67,7 +67,10 @@ dependencies {
testImplementation library.java.vendored_guava_26_0_jre
testImplementation library.java.junit
testImplementation library.java.mockito_core
+ testImplementation library.java.testcontainers_postgresql
testImplementation project(":sdks:java:io:hadoop-common")
testImplementation project(":sdks:java:io:hadoop-format")
+ testImplementation project(path: ":sdks:java:testing:test-utils", configuration: "testRuntimeMigration")
testImplementation project(path: ":runners:direct-java", configuration: "shadow")
+ testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
}
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOIT.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOIT.java
new file mode 100644
index 000000000000..1b30ba712f46
--- /dev/null
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOIT.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry;
+import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
+import static org.apache.beam.sdk.io.common.TestRow.getExpectedHashForRowCount;
+
+import com.google.cloud.Timestamp;
+import io.cdap.plugin.common.Constants;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.PostgresIOTestPipelineOptions;
+import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.metrics.MetricsReader;
+import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.postgresql.ds.PGSimpleDataSource;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * IO Integration test for {@link org.apache.beam.sdk.io.cdap.CdapIO}.
+ *
+ *
{@see https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests} for
+ * more details.
+ */
+@RunWith(JUnit4.class)
+@SuppressWarnings("rawtypes")
+public class CdapIOIT {
+
+ private static final String NAMESPACE = CdapIOIT.class.getName();
+ private static final String[] TEST_FIELD_NAMES = new String[] {"id", "name"};
+ private static final String TEST_ORDER_BY = "id ASC";
+
+ private static PGSimpleDataSource dataSource;
+ private static Integer numberOfRows;
+ private static String tableName;
+ private static InfluxDBSettings settings;
+ private static CdapIOITOptions options;
+ private static PostgreSQLContainer postgreSQLContainer;
+
+ @Rule public TestPipeline writePipeline = TestPipeline.create();
+ @Rule public TestPipeline readPipeline = TestPipeline.create();
+ @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ options = readIOTestPipelineOptions(CdapIOITOptions.class);
+ if (options.isWithTestcontainers()) {
+ setPostgresContainer();
+ }
+
+ dataSource = DatabaseTestHelper.getPostgresDataSource(options);
+ numberOfRows = options.getNumberOfRecords();
+ tableName = DatabaseTestHelper.getTestTableName("CdapIOIT");
+ if (!options.isWithTestcontainers()) {
+ settings =
+ InfluxDBSettings.builder()
+ .withHost(options.getInfluxHost())
+ .withDatabase(options.getInfluxDatabase())
+ .withMeasurement(options.getInfluxMeasurement())
+ .get();
+ }
+ executeWithRetry(CdapIOIT::createTable);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ executeWithRetry(CdapIOIT::deleteTable);
+ if (postgreSQLContainer != null) {
+ postgreSQLContainer.stop();
+ }
+ }
+
+ @Test
+ public void testCdapIOReadsAndWritesCorrectlyInBatch() {
+
+ writePipeline
+ .apply("Generate sequence", GenerateSequence.from(0).to(numberOfRows))
+ .apply("Produce db rows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn()))
+ .apply("Prevent fusion before writing", Reshuffle.viaRandomKey())
+ .apply("Collect write time", ParDo.of(new TimeMonitor<>(NAMESPACE, "write_time")))
+ .apply("Construct rows for DBOutputFormat", ParDo.of(new ConstructDBOutputFormatRowFn()))
+ .apply("Write using CdapIO", writeToDB(getWriteTestParamsFromOptions(options)));
+
+ PipelineResult writeResult = writePipeline.run();
+ writeResult.waitUntilFinish();
+
+ PCollection consolidatedHashcode =
+ readPipeline
+ .apply("Read using CdapIO", readFromDB(getReadTestParamsFromOptions(options)))
+ .apply("Collect read time", ParDo.of(new TimeMonitor<>(NAMESPACE, "read_time")))
+ .apply("Get values only", Values.create())
+ .apply("Values as string", ParDo.of(new TestRow.SelectNameFn()))
+ .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+ PAssert.thatSingleton(consolidatedHashcode).isEqualTo(getExpectedHashForRowCount(numberOfRows));
+
+ PipelineResult readResult = readPipeline.run();
+ readResult.waitUntilFinish();
+
+ if (!options.isWithTestcontainers()) {
+ collectAndPublishMetrics(writeResult, readResult);
+ }
+ }
+
+ private CdapIO.Write writeToDB(Map params) {
+ DBConfig pluginConfig = new ConfigWrapper<>(DBConfig.class).withParams(params).build();
+
+ return CdapIO.write()
+ .withCdapPlugin(
+ Plugin.create(DBBatchSink.class, DBOutputFormat.class, DBOutputFormatProvider.class))
+ .withPluginConfig(pluginConfig)
+ .withKeyClass(TestRowDBWritable.class)
+ .withValueClass(NullWritable.class)
+ .withLocksDirPath(tmpFolder.getRoot().getAbsolutePath());
+ }
+
+ private CdapIO.Read readFromDB(Map params) {
+ DBConfig pluginConfig = new ConfigWrapper<>(DBConfig.class).withParams(params).build();
+
+ return CdapIO.read()
+ .withCdapPlugin(
+ Plugin.create(DBBatchSource.class, DBInputFormat.class, DBInputFormatProvider.class))
+ .withPluginConfig(pluginConfig)
+ .withKeyClass(LongWritable.class)
+ .withValueClass(TestRowDBWritable.class);
+ }
+
+ private Map getTestParamsFromOptions(CdapIOITOptions options) {
+ Map params = new HashMap<>();
+ params.put(DBConfig.DB_URL, DatabaseTestHelper.getPostgresDBUrl(options));
+ params.put(DBConfig.POSTGRES_USERNAME, options.getPostgresUsername());
+ params.put(DBConfig.POSTGRES_PASSWORD, options.getPostgresPassword());
+ params.put(DBConfig.FIELD_NAMES, StringUtils.arrayToString(TEST_FIELD_NAMES));
+ params.put(DBConfig.TABLE_NAME, tableName);
+ params.put(Constants.Reference.REFERENCE_NAME, "referenceName");
+ return params;
+ }
+
+ private Map getReadTestParamsFromOptions(CdapIOITOptions options) {
+ Map params = getTestParamsFromOptions(options);
+ params.put(DBConfig.ORDER_BY, TEST_ORDER_BY);
+ params.put(DBConfig.VALUE_CLASS_NAME, TestRowDBWritable.class.getName());
+ return params;
+ }
+
+ private Map getWriteTestParamsFromOptions(CdapIOITOptions options) {
+ Map params = getTestParamsFromOptions(options);
+ params.put(DBConfig.FIELD_COUNT, String.valueOf(TEST_FIELD_NAMES.length));
+ return params;
+ }
+
+ /** Pipeline options specific for this test. */
+ public interface CdapIOITOptions extends PostgresIOTestPipelineOptions {
+
+ @Description("Whether to use testcontainers")
+ @Default.Boolean(false)
+ Boolean isWithTestcontainers();
+
+ void setWithTestcontainers(Boolean withTestcontainers);
+ }
+
+ private static void setPostgresContainer() {
+ postgreSQLContainer =
+ new PostgreSQLContainer(DockerImageName.parse("postgres").withTag("latest"))
+ .withDatabaseName(options.getPostgresDatabaseName())
+ .withUsername(options.getPostgresUsername())
+ .withPassword(options.getPostgresPassword());
+ postgreSQLContainer.start();
+ options.setPostgresServerName(postgreSQLContainer.getContainerIpAddress());
+ options.setPostgresPort(postgreSQLContainer.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT));
+ options.setPostgresSsl(false);
+ }
+
+ private static void createTable() throws SQLException {
+ DatabaseTestHelper.createTable(dataSource, tableName);
+ }
+
+ private static void deleteTable() throws SQLException {
+ DatabaseTestHelper.deleteTable(dataSource, tableName);
+ }
+
+ private void collectAndPublishMetrics(PipelineResult writeResult, PipelineResult readResult) {
+ String uuid = UUID.randomUUID().toString();
+ String timestamp = Timestamp.now().toString();
+
+ Set> readSuppliers = getReadSuppliers(uuid, timestamp);
+ Set> writeSuppliers =
+ getWriteSuppliers(uuid, timestamp);
+
+ IOITMetrics readMetrics =
+ new IOITMetrics(readSuppliers, readResult, NAMESPACE, uuid, timestamp);
+ IOITMetrics writeMetrics =
+ new IOITMetrics(writeSuppliers, writeResult, NAMESPACE, uuid, timestamp);
+ readMetrics.publishToInflux(settings);
+ writeMetrics.publishToInflux(settings);
+ }
+
+ private Set> getReadSuppliers(
+ String uuid, String timestamp) {
+ Set> suppliers = new HashSet<>();
+ suppliers.add(getTimeMetric(uuid, timestamp, "read_time"));
+ return suppliers;
+ }
+
+ private Set> getWriteSuppliers(
+ String uuid, String timestamp) {
+ Set> suppliers = new HashSet<>();
+ suppliers.add(getTimeMetric(uuid, timestamp, "write_time"));
+ suppliers.add(
+ reader ->
+ NamedTestResult.create(
+ uuid,
+ timestamp,
+ "data_size",
+ DatabaseTestHelper.getPostgresTableSize(dataSource, tableName)
+ .orElseThrow(() -> new IllegalStateException("Unable to fetch table size"))));
+ return suppliers;
+ }
+
+ private Function getTimeMetric(
+ final String uuid, final String timestamp, final String metricName) {
+ return reader -> {
+ long startTime = reader.getStartTimeMetric(metricName);
+ long endTime = reader.getEndTimeMetric(metricName);
+ return NamedTestResult.create(uuid, timestamp, metricName, (endTime - startTime) / 1e3);
+ };
+ }
+
+ /**
+ * Uses the input {@link TestRow} values as seeds to produce new {@link KV}s for {@link CdapIO}.
+ */
+ static class ConstructDBOutputFormatRowFn
+ extends DoFn> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(
+ KV.of(new TestRowDBWritable(c.element().id(), c.element().name()), NullWritable.get()));
+ }
+ }
+}
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBBatchSink.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBBatchSink.java
new file mode 100644
index 000000000000..979240f6a471
--- /dev/null
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBBatchSink.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.data.batch.Output;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.dataset.lib.KeyValue;
+import io.cdap.cdap.etl.api.Emitter;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.PipelineConfigurer;
+import io.cdap.cdap.etl.api.batch.BatchSink;
+import io.cdap.cdap.etl.api.batch.BatchSinkContext;
+
+/** Imitation of CDAP {@link BatchSink} plugin. Used for integration test {@link CdapIO#write()}. */
+@Plugin(type = BatchSink.PLUGIN_TYPE)
+@Name(DBBatchSink.NAME)
+@Description("Plugin writes in batch")
+public class DBBatchSink extends BatchSink {
+
+ public static final String ID_FIELD = "id";
+ public static final String NAME_FIELD = "name";
+ public static final String NAME = "DBSink";
+
+ private final DBConfig config;
+
+ public DBBatchSink(DBConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
+ super.configurePipeline(pipelineConfigurer);
+ FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
+ config.validate(collector);
+ }
+
+ @Override
+ public void prepareRun(BatchSinkContext context) {
+ FailureCollector collector = context.getFailureCollector();
+ config.validate(collector);
+ collector.getOrThrowException();
+ context.addOutput(Output.of(config.referenceName, new DBOutputFormatProvider(config)));
+ }
+
+ @Override
+ public void transform(StructuredRecord input, Emitter> emitter)
+ throws Exception {
+ emitter.emit(new KeyValue<>(input.get(ID_FIELD), input.get(NAME_FIELD)));
+ }
+}
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBBatchSource.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBBatchSource.java
new file mode 100644
index 000000000000..ab24b8f8efe2
--- /dev/null
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBBatchSource.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.data.batch.Input;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.dataset.lib.KeyValue;
+import io.cdap.cdap.etl.api.Emitter;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.PipelineConfigurer;
+import io.cdap.cdap.etl.api.batch.BatchSource;
+import io.cdap.cdap.etl.api.batch.BatchSourceContext;
+import io.cdap.plugin.common.IdUtils;
+import io.cdap.plugin.common.LineageRecorder;
+import java.util.stream.Collectors;
+
+/**
+ * Imitation of CDAP {@link BatchSource} plugin. Used for integration test {@link CdapIO#read()}.
+ */
+@Plugin(type = BatchSource.PLUGIN_TYPE)
+@Name(DBBatchSource.NAME)
+@Description("Plugin reads in batch")
+public class DBBatchSource extends BatchSource {
+
+ private final DBConfig config;
+
+ public static final String NAME = "DBSource";
+
+ public DBBatchSource(DBConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
+ validateConfiguration(pipelineConfigurer.getStageConfigurer().getFailureCollector());
+ pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
+ }
+
+ /**
+ * Prepare DB objects as it could be implemented in CDAP plugin.
+ *
+ * @param context the batch source context
+ */
+ @Override
+ public void prepareRun(BatchSourceContext context) {
+ validateConfiguration(context.getFailureCollector());
+ LineageRecorder lineageRecorder = new LineageRecorder(context, config.referenceName);
+ lineageRecorder.createExternalDataset(config.getSchema());
+ lineageRecorder.recordRead(
+ "Reads",
+ "Reading DB objects",
+ config.getSchema().getFields().stream()
+ .map(Schema.Field::getName)
+ .collect(Collectors.toList()));
+ context.setInput(Input.of(NAME, new DBInputFormatProvider(config)));
+ }
+
+ @Override
+ public void transform(KeyValue input, Emitter emitter) {
+ StructuredRecord.Builder builder = StructuredRecord.builder(config.getSchema());
+ builder.set("id", input.getKey());
+ builder.set("name", input.getValue());
+ emitter.emit(builder.build());
+ }
+
+ private void validateConfiguration(FailureCollector failureCollector) {
+ IdUtils.validateReferenceName(config.referenceName, failureCollector);
+ config.validate(failureCollector);
+ failureCollector.getOrThrowException();
+ }
+}
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBConfig.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBConfig.java
new file mode 100644
index 000000000000..d95c941da22e
--- /dev/null
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBConfig.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.cdap.api.annotation.Macro;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.plugin.common.ReferencePluginConfig;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * {@link io.cdap.cdap.api.plugin.PluginConfig} for {@link DBBatchSource} and {@link DBBatchSink}
+ * CDAP plugins. Used for integration test {@link CdapIO#read()} and {@link CdapIO#write()}.
+ */
+public class DBConfig extends ReferencePluginConfig {
+
+ public static final String DB_URL = "dbUrl";
+ public static final String POSTGRES_USERNAME = "pgUsername";
+ public static final String POSTGRES_PASSWORD = "pgPassword";
+ public static final String TABLE_NAME = "tableName";
+ public static final String FIELD_NAMES = "fieldNames";
+ public static final String FIELD_COUNT = "fieldCount";
+ public static final String ORDER_BY = "orderBy";
+ public static final String VALUE_CLASS_NAME = "valueClassName";
+
+ @Name(DB_URL)
+ @Macro
+ public String dbUrl;
+
+ @Name(POSTGRES_USERNAME)
+ @Macro
+ public String pgUsername;
+
+ @Name(POSTGRES_PASSWORD)
+ @Macro
+ public String pgPassword;
+
+ @Name(TABLE_NAME)
+ @Macro
+ public String tableName;
+
+ @Name(FIELD_NAMES)
+ @Macro
+ public String fieldNames;
+
+ @Name(FIELD_COUNT)
+ @Macro
+ public String fieldCount;
+
+ @Name(ORDER_BY)
+ @Macro
+ public String orderBy;
+
+ @Name(VALUE_CLASS_NAME)
+ @Macro
+ public String valueClassName;
+
+ public DBConfig(
+ String referenceName,
+ String dbUrl,
+ String pgUsername,
+ String pgPassword,
+ String tableName,
+ String fieldNames,
+ String fieldCount,
+ String orderBy,
+ String valueClassName) {
+ super(referenceName);
+ this.dbUrl = dbUrl;
+ this.pgUsername = pgUsername;
+ this.pgPassword = pgPassword;
+ this.tableName = tableName;
+ this.fieldNames = fieldNames;
+ this.fieldCount = fieldCount;
+ this.orderBy = orderBy;
+ this.valueClassName = valueClassName;
+ }
+
+ public Schema getSchema() {
+ Set schemaFields = new HashSet<>();
+ schemaFields.add(Schema.Field.of("id", Schema.nullableOf(Schema.of(Schema.Type.STRING))));
+ schemaFields.add(Schema.Field.of("name", Schema.nullableOf(Schema.of(Schema.Type.STRING))));
+ return Schema.recordOf("etlSchemaBody", schemaFields);
+ }
+
+ public void validate(FailureCollector failureCollector) {
+ if (dbUrl == null) {
+ failureCollector.addFailure("DB URL must be not null.", null).withConfigProperty(DB_URL);
+ }
+ if (pgUsername == null) {
+ failureCollector
+ .addFailure("Postgres username must be not null.", null)
+ .withConfigProperty(POSTGRES_USERNAME);
+ }
+ if (pgPassword == null) {
+ failureCollector
+ .addFailure("Postgres password must be not null.", null)
+ .withConfigProperty(POSTGRES_PASSWORD);
+ }
+ }
+}
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBInputFormatProvider.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBInputFormatProvider.java
new file mode 100644
index 000000000000..f4ddd881ca16
--- /dev/null
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBInputFormatProvider.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.DRIVER_CLASS_PROPERTY;
+import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_CLASS_PROPERTY;
+import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_FIELD_NAMES_PROPERTY;
+import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_ORDER_BY_PROPERTY;
+import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_TABLE_NAME_PROPERTY;
+import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.PASSWORD_PROPERTY;
+import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.URL_PROPERTY;
+import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.USERNAME_PROPERTY;
+
+import io.cdap.cdap.api.data.batch.InputFormatProvider;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+
+/**
+ * {@link InputFormatProvider} for {@link DBBatchSource} CDAP plugin. Used for integration test
+ * {@link CdapIO#read()}.
+ */
+public class DBInputFormatProvider implements InputFormatProvider {
+
+ private static final String POSTGRESQL_DRIVER = "org.postgresql.Driver";
+
+ private final Map conf;
+
+ DBInputFormatProvider(DBConfig config) {
+ this.conf = new HashMap<>();
+
+ conf.put(DRIVER_CLASS_PROPERTY, POSTGRESQL_DRIVER);
+ conf.put(URL_PROPERTY, config.dbUrl);
+ conf.put(USERNAME_PROPERTY, config.pgUsername);
+ conf.put(PASSWORD_PROPERTY, config.pgPassword);
+
+ conf.put(INPUT_TABLE_NAME_PROPERTY, config.tableName);
+ conf.put(INPUT_FIELD_NAMES_PROPERTY, config.fieldNames);
+ conf.put(INPUT_ORDER_BY_PROPERTY, config.orderBy);
+ conf.put(INPUT_CLASS_PROPERTY, config.valueClassName);
+
+ conf.put(HadoopFormatIO.JOB_ID, String.valueOf(1));
+ }
+
+ @Override
+ public String getInputFormatClassName() {
+ return DBInputFormat.class.getName();
+ }
+
+ @Override
+ public Map getInputFormatConfiguration() {
+ return conf;
+ }
+}
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBOutputFormatProvider.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBOutputFormatProvider.java
new file mode 100644
index 000000000000..24ec98251ae4
--- /dev/null
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBOutputFormatProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.DRIVER_CLASS_PROPERTY;
+import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY;
+import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY;
+import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY;
+import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.PASSWORD_PROPERTY;
+import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.URL_PROPERTY;
+import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.USERNAME_PROPERTY;
+
+import io.cdap.cdap.api.data.batch.OutputFormatProvider;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
+
+/**
+ * {@link OutputFormatProvider} for {@link DBBatchSink} CDAP plugin. Used for integration test
+ * {@link CdapIO#write()}.
+ */
+public class DBOutputFormatProvider implements OutputFormatProvider {
+
+ private static final String POSTGRESQL_DRIVER = "org.postgresql.Driver";
+
+ private final Map conf;
+
+ DBOutputFormatProvider(DBConfig config) {
+ this.conf = new HashMap<>();
+
+ conf.put(DRIVER_CLASS_PROPERTY, POSTGRESQL_DRIVER);
+ conf.put(URL_PROPERTY, config.dbUrl);
+ conf.put(USERNAME_PROPERTY, config.pgUsername);
+ conf.put(PASSWORD_PROPERTY, config.pgPassword);
+
+ conf.put(OUTPUT_TABLE_NAME_PROPERTY, config.tableName);
+ conf.put(OUTPUT_FIELD_COUNT_PROPERTY, config.fieldCount);
+ conf.put(OUTPUT_FIELD_NAMES_PROPERTY, config.fieldNames);
+
+ conf.put(HadoopFormatIO.JOB_ID, String.valueOf(1));
+ }
+
+ @Override
+ public String getOutputFormatClassName() {
+ return DBOutputFormat.class.getName();
+ }
+
+ @Override
+ public Map getOutputFormatConfiguration() {
+ return conf;
+ }
+}
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/TestRowDBWritable.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/TestRowDBWritable.java
new file mode 100644
index 000000000000..d85c5ea3e69e
--- /dev/null
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/TestRowDBWritable.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+/**
+ * A subclass of {@link TestRow} to be used with {@link
+ * org.apache.hadoop.mapreduce.lib.db.DBInputFormat}.
+ */
+@DefaultCoder(AvroCoder.class)
+class TestRowDBWritable extends TestRow implements DBWritable, Writable {
+
+ private Integer id;
+ private String name;
+
+ public TestRowDBWritable() {}
+
+ public TestRowDBWritable(Integer id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ @Override
+ public Integer id() {
+ return id;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public void write(PreparedStatement statement) throws SQLException {
+ statement.setInt(1, id);
+ statement.setString(2, name);
+ }
+
+ @Override
+ public void readFields(ResultSet resultSet) throws SQLException {
+ id = resultSet.getInt(1);
+ name = resultSet.getString(2);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(id);
+ out.writeChars(name);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = in.readInt();
+ name = in.readUTF();
+ }
+}