From ce3a140610887b4ff67d62238a1e230dd5b24339 Mon Sep 17 00:00:00 2001 From: "vitaly.terentyev" Date: Sun, 17 Jul 2022 22:00:06 +0400 Subject: [PATCH 1/2] Add integration tests for CdapIO --- .../job_PerformanceTests_CdapIO.groovy | 72 +++++ sdks/java/io/cdap/build.gradle | 3 + .../org/apache/beam/sdk/io/cdap/CdapIOIT.java | 292 ++++++++++++++++++ .../apache/beam/sdk/io/cdap/DBBatchSink.java | 68 ++++ .../beam/sdk/io/cdap/DBBatchSource.java | 90 ++++++ .../org/apache/beam/sdk/io/cdap/DBConfig.java | 118 +++++++ .../sdk/io/cdap/DBInputFormatProvider.java | 70 +++++ .../sdk/io/cdap/DBOutputFormatProvider.java | 68 ++++ .../beam/sdk/io/cdap/TestRowDBWritable.java | 82 +++++ 9 files changed, 863 insertions(+) create mode 100644 .test-infra/jenkins/job_PerformanceTests_CdapIO.groovy create mode 100644 sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOIT.java create mode 100644 sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBBatchSink.java create mode 100644 sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBBatchSource.java create mode 100644 sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBConfig.java create mode 100644 sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBInputFormatProvider.java create mode 100644 sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBOutputFormatProvider.java create mode 100644 sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/TestRowDBWritable.java diff --git a/.test-infra/jenkins/job_PerformanceTests_CdapIO.groovy b/.test-infra/jenkins/job_PerformanceTests_CdapIO.groovy new file mode 100644 index 000000000000..3814bd437983 --- /dev/null +++ b/.test-infra/jenkins/job_PerformanceTests_CdapIO.groovy @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as common +import Kubernetes +import InfluxDBCredentialsHelper + +String jobName = "beam_PerformanceTests_Cdap" + +job(jobName) { + common.setTopLevelMainJobProperties(delegate) + common.setAutoJob(delegate, 'H H/6 * * *') + common.enablePhraseTriggeringFromPullRequest( + delegate, + 'Java CdapIO Performance Test', + 'Run Java CdapIO Performance Test') + InfluxDBCredentialsHelper.useCredentials(delegate) + + String namespace = common.getKubernetesNamespace(jobName) + String kubeconfig = common.getKubeconfigLocationForNamespace(namespace) + Kubernetes k8s = Kubernetes.create(delegate, kubeconfig, namespace) + + k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/postgres/postgres-service-for-local-dev.yml")) + String postgresHostName = "LOAD_BALANCER_IP" + k8s.loadBalancerIP("postgres-for-dev", postgresHostName) + + Map pipelineOptions = [ + tempRoot : 'gs://temp-storage-for-perf-tests', + project : 'apache-beam-testing', + runner : 'DataflowRunner', + numberOfRecords : '600000', + bigQueryDataset : 'beam_performance', + bigQueryTable : 'cdapioit_results', + influxMeasurement : 'cdapioit_results', + influxDatabase : InfluxDBCredentialsHelper.InfluxDBDatabaseName, + influxHost : InfluxDBCredentialsHelper.InfluxDBHostUrl, + postgresUsername : 'postgres', + postgresPassword : 'uuinkks', + postgresDatabaseName : 'postgres', + postgresServerName : "\$${postgresHostName}", + postgresSsl : false, + postgresPort : '5432', + numWorkers : '5', + autoscalingAlgorithm : 'NONE' + ] + + steps { + gradle { + rootBuildScriptDir(common.checkoutDir) + common.setGradleSwitches(delegate) + switches("--info") + switches("-DintegrationTestPipelineOptions=\'${common.joinPipelineOptions(pipelineOptions)}\'") + switches("-DintegrationTestRunner=dataflow") + tasks(":sdks:java:io:cdap:integrationTest --tests org.apache.beam.sdk.io.cdap.CdapIOIT") + } + } +} diff --git a/sdks/java/io/cdap/build.gradle b/sdks/java/io/cdap/build.gradle index a2781cf5b016..1bcc0ece146b 100644 --- a/sdks/java/io/cdap/build.gradle +++ b/sdks/java/io/cdap/build.gradle @@ -67,7 +67,10 @@ dependencies { testImplementation library.java.vendored_guava_26_0_jre testImplementation library.java.junit testImplementation library.java.mockito_core + testImplementation library.java.testcontainers_postgresql testImplementation project(":sdks:java:io:hadoop-common") testImplementation project(":sdks:java:io:hadoop-format") + testImplementation project(path: ":sdks:java:testing:test-utils", configuration: "testRuntimeMigration") testImplementation project(path: ":runners:direct-java", configuration: "shadow") + testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration") } diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOIT.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOIT.java new file mode 100644 index 000000000000..1b30ba712f46 --- /dev/null +++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOIT.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cdap; + +import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry; +import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions; +import static org.apache.beam.sdk.io.common.TestRow.getExpectedHashForRowCount; + +import com.google.cloud.Timestamp; +import io.cdap.plugin.common.Constants; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.common.DatabaseTestHelper; +import org.apache.beam.sdk.io.common.HashingFn; +import org.apache.beam.sdk.io.common.PostgresIOTestPipelineOptions; +import org.apache.beam.sdk.io.common.TestRow; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testutils.NamedTestResult; +import org.apache.beam.sdk.testutils.metrics.IOITMetrics; +import org.apache.beam.sdk.testutils.metrics.MetricsReader; +import org.apache.beam.sdk.testutils.metrics.TimeMonitor; +import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.lib.db.DBInputFormat; +import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; +import org.apache.hadoop.util.StringUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.postgresql.ds.PGSimpleDataSource; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.DockerImageName; + +/** + * IO Integration test for {@link org.apache.beam.sdk.io.cdap.CdapIO}. + * + *

{@see https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests} for + * more details. + */ +@RunWith(JUnit4.class) +@SuppressWarnings("rawtypes") +public class CdapIOIT { + + private static final String NAMESPACE = CdapIOIT.class.getName(); + private static final String[] TEST_FIELD_NAMES = new String[] {"id", "name"}; + private static final String TEST_ORDER_BY = "id ASC"; + + private static PGSimpleDataSource dataSource; + private static Integer numberOfRows; + private static String tableName; + private static InfluxDBSettings settings; + private static CdapIOITOptions options; + private static PostgreSQLContainer postgreSQLContainer; + + @Rule public TestPipeline writePipeline = TestPipeline.create(); + @Rule public TestPipeline readPipeline = TestPipeline.create(); + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() throws Exception { + options = readIOTestPipelineOptions(CdapIOITOptions.class); + if (options.isWithTestcontainers()) { + setPostgresContainer(); + } + + dataSource = DatabaseTestHelper.getPostgresDataSource(options); + numberOfRows = options.getNumberOfRecords(); + tableName = DatabaseTestHelper.getTestTableName("CdapIOIT"); + if (!options.isWithTestcontainers()) { + settings = + InfluxDBSettings.builder() + .withHost(options.getInfluxHost()) + .withDatabase(options.getInfluxDatabase()) + .withMeasurement(options.getInfluxMeasurement()) + .get(); + } + executeWithRetry(CdapIOIT::createTable); + } + + @AfterClass + public static void tearDown() throws Exception { + executeWithRetry(CdapIOIT::deleteTable); + if (postgreSQLContainer != null) { + postgreSQLContainer.stop(); + } + } + + @Test + public void testCdapIOReadsAndWritesCorrectlyInBatch() { + + writePipeline + .apply("Generate sequence", GenerateSequence.from(0).to(numberOfRows)) + .apply("Produce db rows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())) + .apply("Prevent fusion before writing", Reshuffle.viaRandomKey()) + .apply("Collect write time", ParDo.of(new TimeMonitor<>(NAMESPACE, "write_time"))) + .apply("Construct rows for DBOutputFormat", ParDo.of(new ConstructDBOutputFormatRowFn())) + .apply("Write using CdapIO", writeToDB(getWriteTestParamsFromOptions(options))); + + PipelineResult writeResult = writePipeline.run(); + writeResult.waitUntilFinish(); + + PCollection consolidatedHashcode = + readPipeline + .apply("Read using CdapIO", readFromDB(getReadTestParamsFromOptions(options))) + .apply("Collect read time", ParDo.of(new TimeMonitor<>(NAMESPACE, "read_time"))) + .apply("Get values only", Values.create()) + .apply("Values as string", ParDo.of(new TestRow.SelectNameFn())) + .apply("Calculate hashcode", Combine.globally(new HashingFn())); + + PAssert.thatSingleton(consolidatedHashcode).isEqualTo(getExpectedHashForRowCount(numberOfRows)); + + PipelineResult readResult = readPipeline.run(); + readResult.waitUntilFinish(); + + if (!options.isWithTestcontainers()) { + collectAndPublishMetrics(writeResult, readResult); + } + } + + private CdapIO.Write writeToDB(Map params) { + DBConfig pluginConfig = new ConfigWrapper<>(DBConfig.class).withParams(params).build(); + + return CdapIO.write() + .withCdapPlugin( + Plugin.create(DBBatchSink.class, DBOutputFormat.class, DBOutputFormatProvider.class)) + .withPluginConfig(pluginConfig) + .withKeyClass(TestRowDBWritable.class) + .withValueClass(NullWritable.class) + .withLocksDirPath(tmpFolder.getRoot().getAbsolutePath()); + } + + private CdapIO.Read readFromDB(Map params) { + DBConfig pluginConfig = new ConfigWrapper<>(DBConfig.class).withParams(params).build(); + + return CdapIO.read() + .withCdapPlugin( + Plugin.create(DBBatchSource.class, DBInputFormat.class, DBInputFormatProvider.class)) + .withPluginConfig(pluginConfig) + .withKeyClass(LongWritable.class) + .withValueClass(TestRowDBWritable.class); + } + + private Map getTestParamsFromOptions(CdapIOITOptions options) { + Map params = new HashMap<>(); + params.put(DBConfig.DB_URL, DatabaseTestHelper.getPostgresDBUrl(options)); + params.put(DBConfig.POSTGRES_USERNAME, options.getPostgresUsername()); + params.put(DBConfig.POSTGRES_PASSWORD, options.getPostgresPassword()); + params.put(DBConfig.FIELD_NAMES, StringUtils.arrayToString(TEST_FIELD_NAMES)); + params.put(DBConfig.TABLE_NAME, tableName); + params.put(Constants.Reference.REFERENCE_NAME, "referenceName"); + return params; + } + + private Map getReadTestParamsFromOptions(CdapIOITOptions options) { + Map params = getTestParamsFromOptions(options); + params.put(DBConfig.ORDER_BY, TEST_ORDER_BY); + params.put(DBConfig.VALUE_CLASS_NAME, TestRowDBWritable.class.getName()); + return params; + } + + private Map getWriteTestParamsFromOptions(CdapIOITOptions options) { + Map params = getTestParamsFromOptions(options); + params.put(DBConfig.FIELD_COUNT, String.valueOf(TEST_FIELD_NAMES.length)); + return params; + } + + /** Pipeline options specific for this test. */ + public interface CdapIOITOptions extends PostgresIOTestPipelineOptions { + + @Description("Whether to use testcontainers") + @Default.Boolean(false) + Boolean isWithTestcontainers(); + + void setWithTestcontainers(Boolean withTestcontainers); + } + + private static void setPostgresContainer() { + postgreSQLContainer = + new PostgreSQLContainer(DockerImageName.parse("postgres").withTag("latest")) + .withDatabaseName(options.getPostgresDatabaseName()) + .withUsername(options.getPostgresUsername()) + .withPassword(options.getPostgresPassword()); + postgreSQLContainer.start(); + options.setPostgresServerName(postgreSQLContainer.getContainerIpAddress()); + options.setPostgresPort(postgreSQLContainer.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT)); + options.setPostgresSsl(false); + } + + private static void createTable() throws SQLException { + DatabaseTestHelper.createTable(dataSource, tableName); + } + + private static void deleteTable() throws SQLException { + DatabaseTestHelper.deleteTable(dataSource, tableName); + } + + private void collectAndPublishMetrics(PipelineResult writeResult, PipelineResult readResult) { + String uuid = UUID.randomUUID().toString(); + String timestamp = Timestamp.now().toString(); + + Set> readSuppliers = getReadSuppliers(uuid, timestamp); + Set> writeSuppliers = + getWriteSuppliers(uuid, timestamp); + + IOITMetrics readMetrics = + new IOITMetrics(readSuppliers, readResult, NAMESPACE, uuid, timestamp); + IOITMetrics writeMetrics = + new IOITMetrics(writeSuppliers, writeResult, NAMESPACE, uuid, timestamp); + readMetrics.publishToInflux(settings); + writeMetrics.publishToInflux(settings); + } + + private Set> getReadSuppliers( + String uuid, String timestamp) { + Set> suppliers = new HashSet<>(); + suppliers.add(getTimeMetric(uuid, timestamp, "read_time")); + return suppliers; + } + + private Set> getWriteSuppliers( + String uuid, String timestamp) { + Set> suppliers = new HashSet<>(); + suppliers.add(getTimeMetric(uuid, timestamp, "write_time")); + suppliers.add( + reader -> + NamedTestResult.create( + uuid, + timestamp, + "data_size", + DatabaseTestHelper.getPostgresTableSize(dataSource, tableName) + .orElseThrow(() -> new IllegalStateException("Unable to fetch table size")))); + return suppliers; + } + + private Function getTimeMetric( + final String uuid, final String timestamp, final String metricName) { + return reader -> { + long startTime = reader.getStartTimeMetric(metricName); + long endTime = reader.getEndTimeMetric(metricName); + return NamedTestResult.create(uuid, timestamp, metricName, (endTime - startTime) / 1e3); + }; + } + + /** + * Uses the input {@link TestRow} values as seeds to produce new {@link KV}s for {@link CdapIO}. + */ + static class ConstructDBOutputFormatRowFn + extends DoFn> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output( + KV.of(new TestRowDBWritable(c.element().id(), c.element().name()), NullWritable.get())); + } + } +} diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBBatchSink.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBBatchSink.java new file mode 100644 index 000000000000..979240f6a471 --- /dev/null +++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBBatchSink.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cdap; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.batch.Output; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.etl.api.Emitter; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.batch.BatchSink; +import io.cdap.cdap.etl.api.batch.BatchSinkContext; + +/** Imitation of CDAP {@link BatchSink} plugin. Used for integration test {@link CdapIO#write()}. */ +@Plugin(type = BatchSink.PLUGIN_TYPE) +@Name(DBBatchSink.NAME) +@Description("Plugin writes in batch") +public class DBBatchSink extends BatchSink { + + public static final String ID_FIELD = "id"; + public static final String NAME_FIELD = "name"; + public static final String NAME = "DBSink"; + + private final DBConfig config; + + public DBBatchSink(DBConfig config) { + this.config = config; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + super.configurePipeline(pipelineConfigurer); + FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); + config.validate(collector); + } + + @Override + public void prepareRun(BatchSinkContext context) { + FailureCollector collector = context.getFailureCollector(); + config.validate(collector); + collector.getOrThrowException(); + context.addOutput(Output.of(config.referenceName, new DBOutputFormatProvider(config))); + } + + @Override + public void transform(StructuredRecord input, Emitter> emitter) + throws Exception { + emitter.emit(new KeyValue<>(input.get(ID_FIELD), input.get(NAME_FIELD))); + } +} diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBBatchSource.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBBatchSource.java new file mode 100644 index 000000000000..ab24b8f8efe2 --- /dev/null +++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBBatchSource.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cdap; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.batch.Input; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.etl.api.Emitter; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.batch.BatchSource; +import io.cdap.cdap.etl.api.batch.BatchSourceContext; +import io.cdap.plugin.common.IdUtils; +import io.cdap.plugin.common.LineageRecorder; +import java.util.stream.Collectors; + +/** + * Imitation of CDAP {@link BatchSource} plugin. Used for integration test {@link CdapIO#read()}. + */ +@Plugin(type = BatchSource.PLUGIN_TYPE) +@Name(DBBatchSource.NAME) +@Description("Plugin reads in batch") +public class DBBatchSource extends BatchSource { + + private final DBConfig config; + + public static final String NAME = "DBSource"; + + public DBBatchSource(DBConfig config) { + this.config = config; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + validateConfiguration(pipelineConfigurer.getStageConfigurer().getFailureCollector()); + pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema()); + } + + /** + * Prepare DB objects as it could be implemented in CDAP plugin. + * + * @param context the batch source context + */ + @Override + public void prepareRun(BatchSourceContext context) { + validateConfiguration(context.getFailureCollector()); + LineageRecorder lineageRecorder = new LineageRecorder(context, config.referenceName); + lineageRecorder.createExternalDataset(config.getSchema()); + lineageRecorder.recordRead( + "Reads", + "Reading DB objects", + config.getSchema().getFields().stream() + .map(Schema.Field::getName) + .collect(Collectors.toList())); + context.setInput(Input.of(NAME, new DBInputFormatProvider(config))); + } + + @Override + public void transform(KeyValue input, Emitter emitter) { + StructuredRecord.Builder builder = StructuredRecord.builder(config.getSchema()); + builder.set("id", input.getKey()); + builder.set("name", input.getValue()); + emitter.emit(builder.build()); + } + + private void validateConfiguration(FailureCollector failureCollector) { + IdUtils.validateReferenceName(config.referenceName, failureCollector); + config.validate(failureCollector); + failureCollector.getOrThrowException(); + } +} diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBConfig.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBConfig.java new file mode 100644 index 000000000000..d95c941da22e --- /dev/null +++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBConfig.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cdap; + +import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.plugin.common.ReferencePluginConfig; +import java.util.HashSet; +import java.util.Set; + +/** + * {@link io.cdap.cdap.api.plugin.PluginConfig} for {@link DBBatchSource} and {@link DBBatchSink} + * CDAP plugins. Used for integration test {@link CdapIO#read()} and {@link CdapIO#write()}. + */ +public class DBConfig extends ReferencePluginConfig { + + public static final String DB_URL = "dbUrl"; + public static final String POSTGRES_USERNAME = "pgUsername"; + public static final String POSTGRES_PASSWORD = "pgPassword"; + public static final String TABLE_NAME = "tableName"; + public static final String FIELD_NAMES = "fieldNames"; + public static final String FIELD_COUNT = "fieldCount"; + public static final String ORDER_BY = "orderBy"; + public static final String VALUE_CLASS_NAME = "valueClassName"; + + @Name(DB_URL) + @Macro + public String dbUrl; + + @Name(POSTGRES_USERNAME) + @Macro + public String pgUsername; + + @Name(POSTGRES_PASSWORD) + @Macro + public String pgPassword; + + @Name(TABLE_NAME) + @Macro + public String tableName; + + @Name(FIELD_NAMES) + @Macro + public String fieldNames; + + @Name(FIELD_COUNT) + @Macro + public String fieldCount; + + @Name(ORDER_BY) + @Macro + public String orderBy; + + @Name(VALUE_CLASS_NAME) + @Macro + public String valueClassName; + + public DBConfig( + String referenceName, + String dbUrl, + String pgUsername, + String pgPassword, + String tableName, + String fieldNames, + String fieldCount, + String orderBy, + String valueClassName) { + super(referenceName); + this.dbUrl = dbUrl; + this.pgUsername = pgUsername; + this.pgPassword = pgPassword; + this.tableName = tableName; + this.fieldNames = fieldNames; + this.fieldCount = fieldCount; + this.orderBy = orderBy; + this.valueClassName = valueClassName; + } + + public Schema getSchema() { + Set schemaFields = new HashSet<>(); + schemaFields.add(Schema.Field.of("id", Schema.nullableOf(Schema.of(Schema.Type.STRING)))); + schemaFields.add(Schema.Field.of("name", Schema.nullableOf(Schema.of(Schema.Type.STRING)))); + return Schema.recordOf("etlSchemaBody", schemaFields); + } + + public void validate(FailureCollector failureCollector) { + if (dbUrl == null) { + failureCollector.addFailure("DB URL must be not null.", null).withConfigProperty(DB_URL); + } + if (pgUsername == null) { + failureCollector + .addFailure("Postgres username must be not null.", null) + .withConfigProperty(POSTGRES_USERNAME); + } + if (pgPassword == null) { + failureCollector + .addFailure("Postgres password must be not null.", null) + .withConfigProperty(POSTGRES_PASSWORD); + } + } +} diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBInputFormatProvider.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBInputFormatProvider.java new file mode 100644 index 000000000000..f4ddd881ca16 --- /dev/null +++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBInputFormatProvider.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cdap; + +import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.DRIVER_CLASS_PROPERTY; +import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_CLASS_PROPERTY; +import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_FIELD_NAMES_PROPERTY; +import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_ORDER_BY_PROPERTY; +import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_TABLE_NAME_PROPERTY; +import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.PASSWORD_PROPERTY; +import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.URL_PROPERTY; +import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.USERNAME_PROPERTY; + +import io.cdap.cdap.api.data.batch.InputFormatProvider; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO; +import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; + +/** + * {@link InputFormatProvider} for {@link DBBatchSource} CDAP plugin. Used for integration test + * {@link CdapIO#read()}. + */ +public class DBInputFormatProvider implements InputFormatProvider { + + private static final String POSTGRESQL_DRIVER = "org.postgresql.Driver"; + + private final Map conf; + + DBInputFormatProvider(DBConfig config) { + this.conf = new HashMap<>(); + + conf.put(DRIVER_CLASS_PROPERTY, POSTGRESQL_DRIVER); + conf.put(URL_PROPERTY, config.dbUrl); + conf.put(USERNAME_PROPERTY, config.pgUsername); + conf.put(PASSWORD_PROPERTY, config.pgPassword); + + conf.put(INPUT_TABLE_NAME_PROPERTY, config.tableName); + conf.put(INPUT_FIELD_NAMES_PROPERTY, config.fieldNames); + conf.put(INPUT_ORDER_BY_PROPERTY, config.orderBy); + conf.put(INPUT_CLASS_PROPERTY, config.valueClassName); + + conf.put(HadoopFormatIO.JOB_ID, String.valueOf(1)); + } + + @Override + public String getInputFormatClassName() { + return DBInputFormat.class.getName(); + } + + @Override + public Map getInputFormatConfiguration() { + return conf; + } +} diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBOutputFormatProvider.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBOutputFormatProvider.java new file mode 100644 index 000000000000..24ec98251ae4 --- /dev/null +++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/DBOutputFormatProvider.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cdap; + +import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.DRIVER_CLASS_PROPERTY; +import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY; +import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY; +import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY; +import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.PASSWORD_PROPERTY; +import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.URL_PROPERTY; +import static org.apache.hadoop.mapreduce.lib.db.DBConfiguration.USERNAME_PROPERTY; + +import io.cdap.cdap.api.data.batch.OutputFormatProvider; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO; +import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; + +/** + * {@link OutputFormatProvider} for {@link DBBatchSink} CDAP plugin. Used for integration test + * {@link CdapIO#write()}. + */ +public class DBOutputFormatProvider implements OutputFormatProvider { + + private static final String POSTGRESQL_DRIVER = "org.postgresql.Driver"; + + private final Map conf; + + DBOutputFormatProvider(DBConfig config) { + this.conf = new HashMap<>(); + + conf.put(DRIVER_CLASS_PROPERTY, POSTGRESQL_DRIVER); + conf.put(URL_PROPERTY, config.dbUrl); + conf.put(USERNAME_PROPERTY, config.pgUsername); + conf.put(PASSWORD_PROPERTY, config.pgPassword); + + conf.put(OUTPUT_TABLE_NAME_PROPERTY, config.tableName); + conf.put(OUTPUT_FIELD_COUNT_PROPERTY, config.fieldCount); + conf.put(OUTPUT_FIELD_NAMES_PROPERTY, config.fieldNames); + + conf.put(HadoopFormatIO.JOB_ID, String.valueOf(1)); + } + + @Override + public String getOutputFormatClassName() { + return DBOutputFormat.class.getName(); + } + + @Override + public Map getOutputFormatConfiguration() { + return conf; + } +} diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/TestRowDBWritable.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/TestRowDBWritable.java new file mode 100644 index 000000000000..d85c5ea3e69e --- /dev/null +++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/TestRowDBWritable.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cdap; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.io.common.TestRow; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; + +/** + * A subclass of {@link TestRow} to be used with {@link + * org.apache.hadoop.mapreduce.lib.db.DBInputFormat}. + */ +@DefaultCoder(AvroCoder.class) +class TestRowDBWritable extends TestRow implements DBWritable, Writable { + + private Integer id; + private String name; + + public TestRowDBWritable() {} + + public TestRowDBWritable(Integer id, String name) { + this.id = id; + this.name = name; + } + + @Override + public Integer id() { + return id; + } + + @Override + public String name() { + return name; + } + + @Override + public void write(PreparedStatement statement) throws SQLException { + statement.setInt(1, id); + statement.setString(2, name); + } + + @Override + public void readFields(ResultSet resultSet) throws SQLException { + id = resultSet.getInt(1); + name = resultSet.getString(2); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(id); + out.writeChars(name); + } + + @Override + public void readFields(DataInput in) throws IOException { + id = in.readInt(); + name = in.readUTF(); + } +} From 2a23d5689fc8ec7e301ecabe0c473623a1ac34d8 Mon Sep 17 00:00:00 2001 From: "vitaly.terentyev" Date: Mon, 18 Jul 2022 10:00:59 +0400 Subject: [PATCH 2/2] Fix Spotless --- .../job_PerformanceTests_CdapIO.groovy | 82 +++++++++---------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/.test-infra/jenkins/job_PerformanceTests_CdapIO.groovy b/.test-infra/jenkins/job_PerformanceTests_CdapIO.groovy index 3814bd437983..6724c7d9306e 100644 --- a/.test-infra/jenkins/job_PerformanceTests_CdapIO.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_CdapIO.groovy @@ -23,50 +23,50 @@ import InfluxDBCredentialsHelper String jobName = "beam_PerformanceTests_Cdap" job(jobName) { - common.setTopLevelMainJobProperties(delegate) - common.setAutoJob(delegate, 'H H/6 * * *') - common.enablePhraseTriggeringFromPullRequest( - delegate, - 'Java CdapIO Performance Test', - 'Run Java CdapIO Performance Test') - InfluxDBCredentialsHelper.useCredentials(delegate) + common.setTopLevelMainJobProperties(delegate) + common.setAutoJob(delegate, 'H H/6 * * *') + common.enablePhraseTriggeringFromPullRequest( + delegate, + 'Java CdapIO Performance Test', + 'Run Java CdapIO Performance Test') + InfluxDBCredentialsHelper.useCredentials(delegate) - String namespace = common.getKubernetesNamespace(jobName) - String kubeconfig = common.getKubeconfigLocationForNamespace(namespace) - Kubernetes k8s = Kubernetes.create(delegate, kubeconfig, namespace) + String namespace = common.getKubernetesNamespace(jobName) + String kubeconfig = common.getKubeconfigLocationForNamespace(namespace) + Kubernetes k8s = Kubernetes.create(delegate, kubeconfig, namespace) - k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/postgres/postgres-service-for-local-dev.yml")) - String postgresHostName = "LOAD_BALANCER_IP" - k8s.loadBalancerIP("postgres-for-dev", postgresHostName) + k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/postgres/postgres-service-for-local-dev.yml")) + String postgresHostName = "LOAD_BALANCER_IP" + k8s.loadBalancerIP("postgres-for-dev", postgresHostName) - Map pipelineOptions = [ - tempRoot : 'gs://temp-storage-for-perf-tests', - project : 'apache-beam-testing', - runner : 'DataflowRunner', - numberOfRecords : '600000', - bigQueryDataset : 'beam_performance', - bigQueryTable : 'cdapioit_results', - influxMeasurement : 'cdapioit_results', - influxDatabase : InfluxDBCredentialsHelper.InfluxDBDatabaseName, - influxHost : InfluxDBCredentialsHelper.InfluxDBHostUrl, - postgresUsername : 'postgres', - postgresPassword : 'uuinkks', - postgresDatabaseName : 'postgres', - postgresServerName : "\$${postgresHostName}", - postgresSsl : false, - postgresPort : '5432', - numWorkers : '5', - autoscalingAlgorithm : 'NONE' - ] + Map pipelineOptions = [ + tempRoot : 'gs://temp-storage-for-perf-tests', + project : 'apache-beam-testing', + runner : 'DataflowRunner', + numberOfRecords : '600000', + bigQueryDataset : 'beam_performance', + bigQueryTable : 'cdapioit_results', + influxMeasurement : 'cdapioit_results', + influxDatabase : InfluxDBCredentialsHelper.InfluxDBDatabaseName, + influxHost : InfluxDBCredentialsHelper.InfluxDBHostUrl, + postgresUsername : 'postgres', + postgresPassword : 'uuinkks', + postgresDatabaseName : 'postgres', + postgresServerName : "\$${postgresHostName}", + postgresSsl : false, + postgresPort : '5432', + numWorkers : '5', + autoscalingAlgorithm : 'NONE' + ] - steps { - gradle { - rootBuildScriptDir(common.checkoutDir) - common.setGradleSwitches(delegate) - switches("--info") - switches("-DintegrationTestPipelineOptions=\'${common.joinPipelineOptions(pipelineOptions)}\'") - switches("-DintegrationTestRunner=dataflow") - tasks(":sdks:java:io:cdap:integrationTest --tests org.apache.beam.sdk.io.cdap.CdapIOIT") - } + steps { + gradle { + rootBuildScriptDir(common.checkoutDir) + common.setGradleSwitches(delegate) + switches("--info") + switches("-DintegrationTestPipelineOptions=\'${common.joinPipelineOptions(pipelineOptions)}\'") + switches("-DintegrationTestRunner=dataflow") + tasks(":sdks:java:io:cdap:integrationTest --tests org.apache.beam.sdk.io.cdap.CdapIOIT") } + } }