-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-160] Port 'NexMark Queries' to Beam for use as integration test #3114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
R: @jbonofre |
|
R: -@dhalperi |
| return options.isStreaming(); | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these hard-coded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This option allows the user to specify if we wants to run the pipelines in streaming mode or in batch mode. If he sets streaming=true in the options, then the first step of the pipelines will be
Read.from(new UnboundedEventSource) otherwise it will be Read.from(new BoundedEventSource)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the unclear comment; I was referring to the coresPerWorker and maxNumWorkers() functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO I think they are leftovers: coresPerWorker is never used, so I will remove it. And maxNumWorkers() is only used in query 10 (write event logs to Google Cloud Storage) to define the number of shards.
I will start a discussion on the ML about whether we should keep query 10 like this (with Google specifics as the aim was to test a specific Google customer use case) or use a more generic (Beam filesystems) data storage technology. If so, the configuration parameter is likely to disappear also. But this refactoring of query 10 does not block the PR I think. It could be done afterwards.
| */ | ||
| private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) { | ||
| builder.build(options); | ||
| // throw new UnsupportedOperationException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete commented code if it's not needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes .The PR in general lacks a big cleaning :)
| * If monitoring, wait until the publisher pipeline has run long enough to establish | ||
| * a backlog on the Pubsub topic. Otherwise, return immediately. | ||
| */ | ||
| private void waitForPublisherPreload() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider just deleting this along with the other preload-related functionality, since it doesn't work in this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only left conf.preloadSeconds that is used to count initialization time in the time before cancelling the job.
| sinkEventsToAvro(source); | ||
| } | ||
|
|
||
| // Special hacks for Query 10 (big logger). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add more comments about what the hacks are and why they are needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| * Setup pipeline with codes and some other options. | ||
| */ | ||
| public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) { | ||
| //TODO Ismael check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new direct runner doesn't support disabling randomization at all, so this can be deleted. Do the tests pass on DirectRunner without it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, all the queries and and all the unit tests pass in direct runner in batch and in streaming mode even if randomization is not disabled so I can safely remove this piece of code.
| /** | ||
| * Return a transform to write given number of bytes to durable store on every record. | ||
| */ | ||
| public static <T> ParDo.SingleOutput<T, T> diskBusy(String name, final long bytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can use the State API now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, forgot that one, thanks!
| public void encode(Event value, OutputStream outStream) | ||
| throws CoderException, IOException { | ||
| if (value.newPerson != null) { | ||
| INT_CODER.encode(0, outStream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put the tag values in an enum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed that one also, thanks!
|
--none-- |
|
Build finished. |
|
@dpmills thanks for your comments, I addressed all of them, PTAL. |
|
Build finished. |
2c7b50b to
3d0617c
Compare
|
Build finished. |
|
LGTM I'm not a committer however, so over to @dhalperi for comitting. |
|
I don't know how to merge this. Too many commits, too many authors, request not to squash. This is going to really pollute history if merged as-is, but that is quite possibly by design. I'll let @iemejia do it, he's a committer :) |
|
Additionally, I believe there are as-yet-unresolved issues about module names and paths of code. R: @davorbonaci for some input. |
|
@dhalperi I will see with @echauchot to reduce the number of commits, I think we can still squash some of the latest commits (that addressed the review comments + did the latest fixes before the PR), anyway you have to consider that this is a long ongoing work with three different authors so it is normal to have a reasonable number of commits, I won't consider this pollution because there are not intentionally tiny commits, most of them are like this by design, but we will see still how to reduce them. |
|
For reference @davorbonaci the last point Dan mentions is the paths, currently Nexmark uses the original path that Mark had for the PR: integration/java/nexmark This introduces a new directory structure for integration tests on Beam, this can make sense I think and even be used to have also part of the things @ssisk is working on. |
3d0617c to
e01fd05
Compare
|
@dhalperi we managed to reduce the number of commits from 55 to 27: we wanted to keep some big refactorings or some big subjects (like queries fixing/improving) separated. I just pushed the branch. WDYT? |
e01fd05 to
6232a61
Compare
davorbonaci
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice! Left a few comments.
| @@ -0,0 +1,282 @@ | |||
| <!-- | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sdks/java/nexmark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great to have it in the sdk! I am in favor of it. The only question I ask myself is: Nexmark is more a benchmarking suite, not an API or framework. Is it not a problem to have a piece of software that is not used to build software in the sdk module?
| under the License. | ||
| --> | ||
|
|
||
| # Running NexMark on Beam on Flink on Google Compute Platform |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to the website; contribution section?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing that out! Actually it is a leftover, I forgot that file. I propose to simply remove it because we have not tested to run Nexmark that way since Mark did; more important, I prefer communicating about running NexMark on a standard Flink cluster similarly to what is described in the readme about Spark cluster.
integration/java/nexmark/README.md
Outdated
| under the License. | ||
| --> | ||
|
|
||
| # NEXMark integration suite |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
website?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you mean having this content in both beam website and the readme, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, what I propose is to add a new section to the website with a short introduction to Nexmark similar to the talk we gave at the ApacheCon and a reference the Readme (once it is merged) to avoid duplication of information.
Do you find it ok?
integration/java/nexmark/pom.xml
Outdated
| <relativePath>../pom.xml</relativePath> | ||
| </parent> | ||
|
|
||
| <artifactId>beam-integration-java-nexmark</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
beam-sdks-java-nexmark
integration/java/nexmark/pom.xml
Outdated
|
|
||
| <parent> | ||
| <groupId>org.apache.beam</groupId> | ||
| <artifactId>beam-integration-java-parent</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
beam-sdks-java-parent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 modulo my interrogation in your first comment
| /** | ||
| * Nexmark. | ||
| */ | ||
| package org.apache.beam.integration.nexmark; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please double-check the output of the aggregated javadoc. Nexmark packages shouldn't be there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.integration.nexmark.queries; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.apache.beam.sdk.nexmark.XXX (across the board)
pom.xml
Outdated
| <module>sdks</module> | ||
| <module>runners</module> | ||
| <module>examples</module> | ||
| <module>integration</module> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert?
integration/pom.xml
Outdated
| @@ -0,0 +1,51 @@ | |||
| <?xml version="1.0" encoding="UTF-8"?> | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert?
integration/java/pom.xml
Outdated
| @@ -0,0 +1,37 @@ | |||
| <?xml version="1.0" encoding="UTF-8"?> | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert?
|
Thanks @davorbonaci for your comments ! I'll be in meetings today, but I'll address them ASAP. |
45206f6 to
8f23001
Compare
|
Davor, thanks for your review, I have addressed your comments, see the last 4 commits. I have also rebased on master. I will squash all the review comments into |
|
Is this PR blocked on something right now? |
Update configuration of events generation to add some variation Update execution matrix (issue #45)
85d743f to
b3d1f81
Compare
|
@jbonofre moved to 2.2.0-SNAPSHOT and rebased on master |
|
@davorbonaci, any more concerns with this PR? |
davorbonaci
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Sorry for my delay here. At this point, I think this should just be merged. I do think there are open questions such as whether we release this module to Maven Central, does the code belong to the regular jar or test-jar, etc.
Regardless of those questions, please move forward and we'll figure things out, as needed.
b3d1f81 to
4a6c9f0
Compare
|
I'm very glad to see nexmark move forward, thanks! I'm doing a final rebase of some commits. |
4a6c9f0 to
6d068f7
Compare
75a4ae5 to
7aa698c
Compare
…ation code - Use state API in NexmarkUtils.diskBusy() - Remove commented code for direct runner randomization disabling: direct runner no more allows disabling randomization and queries and UT pass
Clean pom, exclude nexmark packages from aggregated javadoc, put spark logs in WARN Update execution matrix in README: Flink termination of streaming pipelines is now ok as far as Nexmark is concerned Remove how to run Nexmark on Flink on Google Cloud Platform from README Update command lines in README after moving nexmark from integration to sdks module
7aa698c to
d83bd07
Compare
|
Thanks for all your great work on this Etienne and Ismaël! Happy to see it closed at last. Best, |
|
No, the big credit is up to you @mshields822 this would not have been possible at all without your work, we just pushed it a bit to the last line, you did the long journey. Thanks for the support and of course for your nice creation. |
|
I could not have said it better :) |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull requestmvn clean verify.<Jira issue #>in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.
R: @dpmills @dhalperi
CC: @stasl @aviemzur @aljoscha because we discussed NexMark together :)
CC: @mshields822 I know you don not work on it anymore, but you might be interested :)
CC: @ssisk for reflexion for IT tests
This is a port of the NexMark queries to Beam, to be used as integration tests.
This can also be used as A-B testing (no-regression or performance comparison between 2 versions of the same engine or of the same runner)
This a continuation of the previous PRs (#99 and #366) from Mark Shields.
The code has changed quite a bit: some queries have changed to use new Beam APIs and there where some big refactorings. More important, we can now run all the queries in all the runners. Nevertheless, there are still some open issues in Nexmark (https://github.com/iemejia/beam/issues) and in Beam upstream (see issue links in https://issues.apache.org/jira/browse/BEAM-160)
Here is a doc that present NexMark components and pseudo code of the queries to ease the review : https://drive.google.com/open?id=1VgnGiVu8vSfm7Et-xAtQYv0PlEpqeyfmhpQUNPmWRJs
Everything needed to launch the queries is in the Readme. There is also a support matrix towards the runners.
Please do not squash commits because there are several authors Mark, Ismaël and I.
Good review :) !