Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {

/* Options for test pipeline for file-based I/O in 'sdks/java/io/file-based-io-tests/'. */
@Description("Number records that will be written and read by the test")
@Default.Long(100000)
Long getNumberOfRecords();
@Default.Integer(100000)
Integer getNumberOfRecords();

void setNumberOfRecords(Long count);
void setNumberOfRecords(Integer count);

@Description("Destination prefix for files generated by the test")
@Validation.Required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ public void processElement(ProcessContext c) {
* the name() for the rows generated from seeds in [0, n).
*/
private static final Map<Integer, String> EXPECTED_HASHES = ImmutableMap.of(
1000, "7d94d63a41164be058a9680002914358"
1000, "7d94d63a41164be058a9680002914358",
100_000, "c7cbddb319209e200f1c5eebef8fe960",
5_000_000, "c44f8a5648cd9207c9c6f77395a998dc"
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class AvroIOIT {
+ "}");

private static String filenamePrefix;
private static Long numberOfTextLines;
private static Integer numberOfTextLines;

@Rule
public TestPipeline pipeline = TestPipeline.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ public static String appendTimestampToPrefix(String filenamePrefix) {
return String.format("%s_%s", filenamePrefix, new Date().getTime());
}

public static String getExpectedHashForLineCount(Long lineCount) {
Map<Long, String> expectedHashes = ImmutableMap.of(
100_000L, "4c8bb3b99dcc59459b20fefba400d446",
1_000_000L, "9796db06e7a7960f974d5a91164afff1",
100_000_000L, "6ce05f456e2fdc846ded2abd0ec1de95"
public static String getExpectedHashForLineCount(int lineCount) {
Map<Integer, String> expectedHashes = ImmutableMap.of(
100_000, "4c8bb3b99dcc59459b20fefba400d446",
1_000_000, "9796db06e7a7960f974d5a91164afff1",
100_000_000, "6ce05f456e2fdc846ded2abd0ec1de95"
);

String hash = expectedHashes.get(lineCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;

import java.text.ParseException;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
Expand Down Expand Up @@ -65,14 +64,14 @@
public class TextIOIT {

private static String filenamePrefix;
private static Long numberOfTextLines;
private static Integer numberOfTextLines;
private static Compression compressionType;

@Rule
public TestPipeline pipeline = TestPipeline.create();

@BeforeClass
public static void setup() throws ParseException {
public static void setup() {
IOTestPipelineOptions options = readTestPipelineOptions();

numberOfTextLines = options.getNumberOfRecords();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;

import java.text.ParseException;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TFRecordIO;
Expand Down Expand Up @@ -67,7 +66,7 @@
public class TFRecordIOIT {

private static String filenamePrefix;
private static Long numberOfTextLines;
private static Integer numberOfTextLines;
private static Compression compressionType;

@Rule
Expand All @@ -77,7 +76,7 @@ public class TFRecordIOIT {
public TestPipeline readPipeline = TestPipeline.create();

@BeforeClass
public static void setup() throws ParseException {
public static void setup() {
IOTestPipelineOptions options = readTestPipelineOptions();

numberOfTextLines = options.getNumberOfRecords();
Expand Down
2 changes: 2 additions & 0 deletions sdks/java/io/jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
<executable>${python.interpreter.bin}</executable>
<arguments>
<argument>${pkbLocation}</argument>
<argument>-beam_it_timeout=1800</argument>
<argument>-benchmarks=beam_integration_benchmark</argument>
<argument>-beam_it_profile=io-it</argument>
<argument>-beam_location=${beamRootProjectDir}</argument>
Expand Down Expand Up @@ -204,6 +205,7 @@
<executable>${python.interpreter.bin}</executable>
<arguments>
<argument>${pkbLocation}</argument>
<argument>-beam_it_timeout=1800</argument>
<argument>-benchmarks=beam_integration_benchmark</argument>
<argument>-beam_it_profile=io-it</argument>
<argument>-beam_location=${beamRootProjectDir}</argument>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.postgresql.ds.PGSimpleDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* A test of {@link org.apache.beam.sdk.io.jdbc.JdbcIO} on an independent Postgres instance.
Expand All @@ -56,7 +53,8 @@
* "--postgresUsername=postgres",
* "--postgresDatabaseName=myfancydb",
* "--postgresPassword=mypass",
* "--postgresSsl=false" ]'
* "--postgresSsl=false",
* "--numberOfRecords=1000" ]'
* </pre>
*
* <p>If you want to run this with a runner besides directrunner, there are profiles for dataflow
Expand All @@ -65,8 +63,8 @@
*/
@RunWith(JUnit4.class)
public class JdbcIOIT {
private static final Logger LOG = LoggerFactory.getLogger(JdbcIOIT.class);
public static final int EXPECTED_ROW_COUNT = 1000;

private static int numberOfRows;
private static PGSimpleDataSource dataSource;
private static String tableName;

Expand All @@ -81,14 +79,14 @@ public static void setup() throws SQLException, ParseException {
IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
.as(IOTestPipelineOptions.class);

numberOfRows = options.getNumberOfRecords();
dataSource = getDataSource(options);

tableName = JdbcTestHelper.getTableName("IT");
JdbcTestHelper.createDataTable(dataSource, tableName);
}

private static PGSimpleDataSource getDataSource(IOTestPipelineOptions options)
throws SQLException {
private static PGSimpleDataSource getDataSource(IOTestPipelineOptions options) {
PGSimpleDataSource dataSource = new PGSimpleDataSource();

dataSource.setDatabaseName(options.getPostgresDatabaseName());
Expand Down Expand Up @@ -124,7 +122,7 @@ public void testWriteThenRead() {
* the database.)
*/
private void runWrite() {
pipelineWrite.apply(GenerateSequence.from(0).to((long) EXPECTED_ROW_COUNT))
pipelineWrite.apply(GenerateSequence.from(0).to(numberOfRows))
.apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn()))
.apply(JdbcIO.<TestRow>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
Expand Down Expand Up @@ -162,13 +160,13 @@ private void runRead() {

PAssert.thatSingleton(
namesAndIds.apply("Count All", Count.<TestRow>globally()))
.isEqualTo((long) EXPECTED_ROW_COUNT);
.isEqualTo((long) numberOfRows);

PCollection<String> consolidatedHashcode = namesAndIds
.apply(ParDo.of(new TestRow.SelectNameFn()))
.apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode)
.containsInAnyOrder(TestRow.getExpectedHashForRowCount(EXPECTED_ROW_COUNT));
.containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));

PCollection<List<TestRow>> frontOfList =
namesAndIds.apply(Top.<TestRow>smallest(500));
Expand All @@ -178,8 +176,7 @@ private void runRead() {
PCollection<List<TestRow>> backOfList =
namesAndIds.apply(Top.<TestRow>largest(500));
Iterable<TestRow> expectedBackOfList =
TestRow.getExpectedValues(EXPECTED_ROW_COUNT - 500,
EXPECTED_ROW_COUNT);
TestRow.getExpectedValues(numberOfRows - 500, numberOfRows);
PAssert.thatSingletonIterable(backOfList).containsInAnyOrder(expectedBackOfList);

pipelineRead.run().waitUntilFinish();
Expand Down