-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-3217] add HadoopInputFormatIO integration test using DBInputFormat #4332
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
chamikaramj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| .apply("Get values only", Values.<TestRowDBWritable>create()) | ||
| .apply("Values as string", ParDo.of(new SelectNameFn())) | ||
| .apply("Calculate hashcode", Combine.globally(new HashingFn())) | ||
| .apply(Reshuffle.<String>viaRandomKey()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment explaining why we need this reshuffle and add a link to the JIRA.
| writePipeline.run().waitUntilFinish(); | ||
|
|
||
| PCollection<String> consolidatedHashcode = readPipeline | ||
| .apply("Read using DBInputFormat", HadoopInputFormatIO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prob. rename to "Read using HadoopInputFormat" since that explains better what we are testing here.
| public void writeThenReadUsingDBInputFormat() { | ||
| writePipeline.apply("Generate sequence", GenerateSequence.from(0).to(numberOfRows)) | ||
| .apply("Produce db rows", ParDo.of(new DeterministicallyConstructTestRowFn())) | ||
| .apply(JdbcIO.<TestRow>write() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a label, For example "Write using JDBCIO")
| } | ||
|
|
||
| @Test | ||
| public void writeThenReadUsingDBInputFormat() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to readUsingHadoopInputFormat() ? (since write part is done usning JDBCIO).
| } | ||
| } | ||
|
|
||
| public static void cleanUpDataTable(DataSource dataSource, String tableName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleteTable ?
| return dataSource; | ||
| } | ||
|
|
||
| public static void createDataTable(DataSource dataSource, String tableName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
createTable ?
| public void writeThenReadUsingDBInputFormat() { | ||
| writePipeline.apply("Generate sequence", GenerateSequence.from(0).to(numberOfRows)) | ||
| .apply("Produce db rows", ParDo.of(new DeterministicallyConstructTestRowFn())) | ||
| .apply(JdbcIO.<TestRow>write() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writing using JDBCIO is fine. Probably your pipeline is slow since you do all the writing From a single worker. Consider creating a PCollection of seed objects (splits) followed by a Reshuffle() followed by writing.
|
@chamikaramj thank you for your suggestions. @jbonofre @iemejia could you also take a look? I also added io-it-suite-local profile that was missing and jenkins job definition.
I added only the reshuffle and it seems to be a little bit helpful. I didn't optimise it further due to a problem: different "consolidatedHashes" get calculated for each test run for datasets bigger than 600 000 rows. This makes it unable to determine hash for a large scale dataset (eg. 40 000 000 rows). The amount of read and written rows is the same. I also have the same problems while running JdbcIOIT on larger datasets. Also, as I checked, the database content seems to be all right. I can create a JIRA for that after you review this PR and agree that this behavior is odd, ok? 600 000 is approx. 160 MB. I wouldn't call that a large scale test but I think it is something we can start with and then increase the scale and optimize it gradually if needed and if possible (e.g. after tackling the hash calculaction problem i described). What do you think? |
| <name>Apache Beam :: SDKs :: Java :: IO :: Hadoop :: input-format</name> | ||
| <description>IO to read data from data sources which implement Hadoop Input Format.</description> | ||
|
|
||
| <profiles> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I simply duplicated the code from jdbc io, applying necessary changes. I think it can be improved later, see: https://issues.apache.org/jira/browse/BEAM-3440
chamikaramj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM
| } | ||
| } | ||
|
|
||
| public static void cleanUpDataTable(DataSource dataSource, String tableName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chamikaramj wrote:
deleteTable ?
Done.
| } | ||
|
|
||
| @Test | ||
| public void writeThenReadUsingDBInputFormat() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chamikaramj wrote:
Rename to readUsingHadoopInputFormat() ? (since write part is done usning JDBCIO).
Done.
| return dataSource; | ||
| } | ||
|
|
||
| public static void createDataTable(DataSource dataSource, String tableName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chamikaramj wrote:
createTable ?
Done.
| public void writeThenReadUsingDBInputFormat() { | ||
| writePipeline.apply("Generate sequence", GenerateSequence.from(0).to(numberOfRows)) | ||
| .apply("Produce db rows", ParDo.of(new DeterministicallyConstructTestRowFn())) | ||
| .apply(JdbcIO.<TestRow>write() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chamikaramj wrote:
Writing using JDBCIO is fine. Probably your pipeline is slow since you do all the writing From a single worker. Consider creating a PCollection of seed objects (splits) followed by a Reshuffle() followed by writing.
Done.
| writePipeline.run().waitUntilFinish(); | ||
|
|
||
| PCollection<String> consolidatedHashcode = readPipeline | ||
| .apply("Read using DBInputFormat", HadoopInputFormatIO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chamikaramj wrote:
Prob. rename to "Read using HadoopInputFormat" since that explains better what we are testing here.
Done.
| .apply("Get values only", Values.<TestRowDBWritable>create()) | ||
| .apply("Values as string", ParDo.of(new SelectNameFn())) | ||
| .apply("Calculate hashcode", Combine.globally(new HashingFn())) | ||
| .apply(Reshuffle.<String>viaRandomKey()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chamikaramj wrote:
Add a comment explaining why we need this reshuffle and add a link to the JIRA.
Done.
|
Might be worth filing a JIRA for the JDBC issue in case it's a bug in the sink. I'm fine with enabling this tests for a smaller Dataset and increasing the size later after fixes. |
|
Run seed job |
|
Run Java HadoopInputFormatIO Performance Test |
|
Any idea why following failed ? https://builds.apache.org/job/beam_PerformanceTests_HadoopInputFormatIO_IT/1/console |
cef5e27 to
7b2d06b
Compare
|
Run seed job |
|
Run Java HadoopInputFormatIO Performance Test |
1 similar comment
|
Run Java HadoopInputFormatIO Performance Test |
7b2d06b to
c27f1a5
Compare
|
Run seed job |
|
Run Java HadoopInputFormatIO Performance Test |
2 similar comments
|
Run Java HadoopInputFormatIO Performance Test |
|
Run Java HadoopInputFormatIO Performance Test |
@chamikaramj Judging from the logs you attached either the kubeconfig's location is wrong or the postgres.yml's. In the new commit i tried to overwrite the kubeconfig's location it with value I got from our local jenkins setup hoping that it's some default value. No luck. I think someone with access to jenkins is needed - we need to know the path to kubeconfig to set it up correctly. |
c27f1a5 to
67cff14
Compare
|
Run seed job |
|
Run Java HadoopInputFormatIO Performance Test |
|
I also tried to set different path to kubernetes scripts - analogous to the one that was in JDBC tests: https://builds.apache.org/view/A-D/view/Beam/job/beam_PerformanceTests_JDBC/215/console also no luck, because i got some "permission denied error" even earlier (the path didn't even matter that early): https://builds.apache.org/job/beam_PerformanceTests_HadoopInputFormatIO_IT/16/console @jbonofre can you help in diagnosing what is going on? |
The kubernetes infrastructure that is needed for the jenkins job to run is not available for now. We should add it once the infrastructure is there.
|
Removed jenkins job due to the reasons described in 4392 pull request. The job should be added in a separate PR after the problems are solved. @chamikaramj could you take a look? |
|
Can you merge and resolve conflicts ? LGTM other than that. |
ee7400e to
d502078
Compare
|
LGTM. Thanks. |
|
This broke the gradle build: |
…mat (apache#4332) The kubernetes infrastructure that is needed for the jenkins job to run is not available for now. We should add it once the infrastructure is there.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue.mvn clean verifyto make sure basic checks pass. A more thorough check will be performed on your pull request automatically.@chamikaramj could you take a look? What do you think about the idea of using JdbcIO to write the data? There is no write in HadoopInputFormatIO.
Currently the test is working only for small datasets (100 000 rows, which is about 3,43 MB). For 1 000 000 rows the test is flaky (different hashes get generated). I'm investigating the cause now. After that I'm planning to add the a large dataset hash.
Also, last thing: large scale scenarios (e.g 40 000 000 rows, which is 10gb according to dataflow's estimation) can take quite a long time to run. Only the write pipline executes for more than 35 minutes. The JdbcIO.write() seems to be the bottleneck, as it is done sequentially - one-row inserts, one after another. I think grouping and then inserting "batches" of rows to the database will speed things up, though I don't know how much. Should I do this or the execution time is bearable? Are there some other optimisations I might want to consider?
PS: Happy new year! 🎊 :)