Skip to content

Conversation

@scwhittle
Copy link
Contributor

watermarks using commit time, allow specifying the
number of partitions instead of requiring launcher
to have access to kafka cluster, and removing
unneeded deserialization of keys.

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

watermarks using commit time, allow specifying the
number of partitions instead of requiring launcher
to have access to kafka cluster, and removing
unneeded deserialization of keys.
@scwhittle
Copy link
Contributor Author

R: @aromanenko-dev I see you added this with #5019, would you be able to review?
In particular, do you think using kafka create time for nexmark event timestamps should be the new default (for reading/writing) or just optional?
Without these changes populating a topic by running query 0 and then reading the topic for other queries has incorrect watermark semantics, given that nexmark uses historical events by default.

@reuvenlax FYI

@aromanenko-dev
Copy link
Contributor

@scwhittle Thanks! Let take a look on this

@aromanenko-dev
Copy link
Contributor

Sorry for delay with a review.
It LGTM, I think we should stick with create time for Nexmark with Kafka.
A quick question - how did you test it?

@scwhittle
Copy link
Contributor Author

Thanks Alexey! I made this change after generating kafka data and then consuming it with a dataflow pipeline showed the watermark was around realtime despite events being historical.

After these changes, generating the output and consuming it running the pipeline on the dataflow runner had correct watermarks in the past.

I ran the following command to generate kafka data (using dataflow runner)
./gradlew :sdks:java:testing:nexmark:run -Pnexmark.runner=":runners:google-cloud-dataflow-java" -Pnexmark.args="--exportSummaryToBigQuery=false --resourceNameMode=VERBATIM --runner=DataflowRunner --project=${PROJECT?:} --region=${REGION?:} --gcpTempLocation=${TEMP_LOCATION?:} --streaming=true --manageResources=false --monitorJobs=false --sourceType=KAFKA --pubSubMode=PUBLISH_ONLY --bootstrapServers=${BOOTSTRAP_SERVER?:} --kafkaTopic=${KAFKA_TOPIC?:} --query=0 --sinkType=KAFKA --numEvents=${NUM_EVENTS?:} --numWorkers=20"

And the following is a sample command to run a query consuming from kafka:
./gradlew :sdks:java:testing:nexmark:run -Pnexmark.runner=":runners:google-cloud-dataflow-java" -Pnexmark.args="--exportSummaryToBigQuery=false --resourceNameMode=QUERY_RUNNER_AND_MODE --runner=DataflowRunner --streaming=true --manageResources=false --monitorJobs=true --sourceType=KAFKA --sinkType=COUNT_ONLY --pubSubMode=SUBSCRIBE_ONLY --bootstrapServers=${BOOTSTRAP_SERVER?:} --kafkaTopic=${KAFKA_TOPIC?:} --query=1 --project=${PROJECT?:} --tempLocation=${TEMP_LOCATION?:} --enableStreamingEngine --cancelStreamingJobAfterFinish --numKafkaTopicPartitions=${NUM_PARTITIONS?:} --numWorkers=4 --autoscalingAlgorithm=NONE"

@aromanenko-dev aromanenko-dev merged commit bfdb3e5 into apache:master Dec 21, 2020
@aromanenko-dev
Copy link
Contributor

@scwhittle Thanks! In the future, it would be great to run Kafka-based Nexmark regularly and add it to the Metrics charts

@scwhittle scwhittle deleted the nexmark_kafka branch April 16, 2021 08:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants