Skip to content

Conversation

@vectorijk
Copy link
Contributor

Add support for Kafka as source/sink on Nexmark

@vectorijk
Copy link
Contributor Author

R: @iemejia @echauchot

@echauchot
Copy link
Contributor

Thanks for your help @vectorijk ! I'll take a look as soon as I'm done with my current PR.

Copy link
Member

@iemejia iemejia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome job, thanks a lot for working on it, I commented on some small fixes. Can you also please add some documentation on how to use Nexmark with Kafka in the Nexmark README. (We are moving this eventually to the web site but we will do this later).

<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<exclusions>
<exclusion>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this exclusion is not needed. Can you remove it if so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed it

}));
}

static final ParDo.SingleOutput<Event, byte[]> EVENT_TO_BYTEARRAY =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private, maybe make the return type DoFn instead of ParDo and apply the ParDo in the other part to make it more Beam style, (and yes I know nexmark does not do this in other parts shame on us :P).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}


static final ParDo.SingleOutput<KV<Long, byte[]>, Event> BYTEARRAY_TO_EVENT =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as with the other ParDo (private + Fn).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* Send {@code events} to Kafka.
*/
private void sinkEventsToKafka(PCollection<Event> events) {
if (Strings.isNullOrEmpty(options.getBootstrapServers())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change for Preconditions.checkArgument

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is actually unneeded because this check is already done in KafkaIO.Write.validate. You can remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove it.

/**
* Send {@code formattedResults} to Kafka.
*/
private void sinkResultsToKafka(PCollection<String> formattedResults) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change for Preconditions.checkArgument

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

/**
* Send {@code events} to Kafka.
*/
private void sinkEventsToKafka(PCollection<Event> events) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently unused, can you add something similar to Pubsub's PUBLISH_ONLY so users can generate and record the events in Kafka.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, it might be useful as well to inject events to the benchmark using kafka similarly to what is done in Pusb/Sub COMBINED mode.

@vectorijk
Copy link
Contributor Author

retest this please

Copy link
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vectorijk, thanks for your work!
Besides I have tested your PR by launching query1 in batch mode in direct runner with a kafka sink to a local kafka instance through this command
mvn -Pdirect-runner exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main "-Dexec.args=--sinkType=KAFKA --bootstrapServers=127.0.0.1:9092 --runner=DirectRunner --query=1 --streaming=false --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true"
And I got this error:

GRAVE: Uncaught error in kafka producer I/O thread: 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topic_metadata': Error reading array of size 1702391137, only 37 bytes available
	at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)
	at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380)
	at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
	at java.lang.Thread.run(Thread.java:745)

Maybe related to the fact that you use PCollection<String> formattedResults and apply to it a KafkaIO.<Long, String> with keySerializer whereas there is no key in the PCollection

}));
}

static final ParDo.SingleOutput<Event, byte[]> EVENT_TO_BYTEARRAY =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

/**
* Send {@code events} to Kafka.
*/
private void sinkEventsToKafka(PCollection<Event> events) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, it might be useful as well to inject events to the benchmark using kafka similarly to what is done in Pusb/Sub COMBINED mode.

* Send {@code events} to Kafka.
*/
private void sinkEventsToKafka(PCollection<Event> events) {
if (Strings.isNullOrEmpty(options.getBootstrapServers())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is actually unneeded because this check is already done in KafkaIO.Write.validate. You can remove it.

Copy link
Contributor Author

@vectorijk vectorijk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @echauchot @iemejia for the review! I will address comments asap.

<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<exclusions>
<exclusion>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed it

@echauchot
Copy link
Contributor

@vectorijk Do you need any help?

@echauchot
Copy link
Contributor

ping?

@vectorijk
Copy link
Contributor Author

vectorijk commented Nov 9, 2017 via email

@vectorijk
Copy link
Contributor Author

@echauchot I tried mvn clean install -DskipTests exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=false --query=1 --manageResources=true --monitorJobs=true --enforceEncodability=true --enforceImmutability=true --sinkType=KAFKA --bootstrapServers=localhost:9092"

It works for me on my local. I can't reproduce. @iemejia Could you test this on your side?

@echauchot
Copy link
Contributor

@vectorijk thanks for the fixes! I still have 2 comments:

  • what about implementing COMBINED mode?
  • a remark for the future: you should rebase your branch on master and not merge master into your branch (see beam contribution guide). Can you remove the merge commits (using rebase -i) and, rebase on master?
    Waiting for @iemejia to test on his local setup for the org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topic_metadata'

Copy link
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waiting for status on error to see if it is related to my setup and also the COMBINED state.

@vectorijk
Copy link
Contributor Author

@echauchot I will take a look at COMBINED mode and update it soon.

@aaltay
Copy link
Member

aaltay commented Nov 28, 2017

@vectorijk any updates?

@kennknowles
Copy link
Member

How is this going? This seems nice to have. It is probably a good time to rebase and restart review since Nexmark has change a little and the codebase is Java 8.

@vectorijk
Copy link
Contributor Author

@kennknowles I will change and rebase this.

@echauchot
Copy link
Contributor

@vectorijk I'm waiting for the status of the test on @iemejia's setup so that we could see if the org.apache.kafka.common.protocol.types.SchemaException is related to my setup. When it is ok I'll merge the PR. Besides, what about the combined mode?

@aromanenko-dev
Copy link
Contributor

I checked out this PR and rebased it against master (there were some minor conflicts, easy to resolve).
Then I run Nexmark as @echauchot suggested above and it works for me:
$ mvn -Pdirect-runner exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main "-Dexec.args=--sinkType=KAFKA --bootstrapServers=localhost:9092 --runner=DirectRunner --query=1 --streaming=false --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true"

The output result:

Configurations:
  Conf  Description
  0000  query:1; sinkType:KAFKA

Performance:
  Conf  Runtime(sec)    (Baseline)  Events(/sec)    (Baseline)       Results    (Baseline)
  0000          10.5                      9510.2                       92000        

@echauchot
Copy link
Contributor

@aromanenko-dev thanks for the feedback! So the issue that I reported was due to my setup then. I'll rebase and merge.

@echauchot
Copy link
Contributor

There was a point in my review about the COMBINED mode but we could support it later in another PR.

@echauchot
Copy link
Contributor

echauchot commented Apr 4, 2018

@vectorijk seems to be unavailable so I propose to do another PR for minor fixes: squash (with obviously same author), rebase on master, and wire up unused sinkEventsToKafka method. Then merge that one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants