Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public static void createTableWithStatement(DataSource dataSource, String stmt)
public static ArrayList<KV<Integer, String>> getTestDataToWrite(long rowsToAdd) {
ArrayList<KV<Integer, String>> data = new ArrayList<>();
for (int i = 0; i < rowsToAdd; i++) {
KV<Integer, String> kv = KV.of(i, "Test");
KV<Integer, String> kv = KV.of(i, TestRow.getNameForSeed(i));
data.add(kv);
}
return data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import com.google.cloud.Timestamp;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
Expand All @@ -51,8 +50,8 @@
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand All @@ -62,6 +61,7 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -96,8 +96,6 @@
@RunWith(JUnit4.class)
public class JdbcIOIT {

// the number of rows written to table in normal integration tests (not the performance test).
private static final int EXPECTED_ROW_COUNT = 1000;
private static final String NAMESPACE = JdbcIOIT.class.getName();
// the number of rows written to table in the performance test.
private static int numberOfRows;
Expand All @@ -117,6 +115,7 @@ public static void setup() {
}
org.junit.Assume.assumeNotNull(options);
numberOfRows = options.getNumberOfRecords();

dataSource = DatabaseTestHelper.getPostgresDataSource(options);
tableName = DatabaseTestHelper.getTestTableName("IT");
settings =
Expand All @@ -137,7 +136,7 @@ public void testWriteThenRead() throws SQLException {
try {
PipelineResult writeResult = runWrite();
PipelineResult.State writeState = writeResult.waitUntilFinish();
PipelineResult readResult = runRead();
PipelineResult readResult = runRead(tableName);
PipelineResult.State readState = readResult.waitUntilFinish();
gatherAndPublishMetrics(writeResult, readResult);
// Fail the test if pipeline failed.
Expand Down Expand Up @@ -234,7 +233,10 @@ private PipelineResult runWrite() {
* verify that their values are correct. Where first/last 500 rows is determined by the fact that
* we know all rows have a unique id - we can use the natural ordering of that key.
*/
private PipelineResult runRead() {
private PipelineResult runRead(String tableName) {
if (tableName == null) {
tableName = JdbcIOIT.tableName;
}
PCollection<TestRow> namesAndIds =
pipelineRead
.apply(
Expand Down Expand Up @@ -382,9 +384,15 @@ public void testWriteWithWriteResults() throws Exception {
String firstTableName = DatabaseTestHelper.getTestTableName("JDBCIT_WRITE");
DatabaseTestHelper.createTable(dataSource, firstTableName);
try {
ArrayList<KV<Integer, String>> data = getTestDataToWrite(EXPECTED_ROW_COUNT);

PCollection<KV<Integer, String>> dataCollection = pipelineWrite.apply(Create.of(data));
PCollection<KV<Integer, String>> dataCollection =
pipelineWrite
.apply(GenerateSequence.from(0).to(numberOfRows))
.apply(
FlatMapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.integers(), TypeDescriptors.strings()))
.via(num -> getTestDataToWrite(1)));
PCollection<JdbcTestHelper.TestDto> resultSetCollection =
dataCollection.apply(
getJdbcWriteWithReturning(firstTableName)
Expand All @@ -397,16 +405,12 @@ public void testWriteWithWriteResults() throws Exception {
}));
resultSetCollection.setCoder(JdbcTestHelper.TEST_DTO_CODER);

List<JdbcTestHelper.TestDto> expectedResult = new ArrayList<>();
for (int id = 0; id < EXPECTED_ROW_COUNT; id++) {
expectedResult.add(new JdbcTestHelper.TestDto(id));
}

PAssert.that(resultSetCollection).containsInAnyOrder(expectedResult);
PAssert.that(resultSetCollection.apply(Count.globally()))
.containsInAnyOrder(Long.valueOf(numberOfRows));

pipelineWrite.run().waitUntilFinish();

assertRowCount(dataSource, firstTableName, EXPECTED_ROW_COUNT);
assertRowCount(dataSource, firstTableName, numberOfRows);
} finally {
DatabaseTestHelper.deleteTable(dataSource, firstTableName);
}
Expand Down