diff --git a/sdks/java/nexmark/build.gradle b/sdks/java/nexmark/build.gradle
index b8f354d7af50..83a3d05f99d4 100644
--- a/sdks/java/nexmark/build.gradle
+++ b/sdks/java/nexmark/build.gradle
@@ -27,6 +27,7 @@ dependencies {
shadow project(path: ":beam-sdks-java-io-google-cloud-platform", configuration: "shadow")
shadow project(path: ":beam-sdks-java-extensions-google-cloud-platform-core", configuration: "shadow")
shadow project(path: ":beam-sdks-java-extensions-sql", configuration: "shadow")
+ shadow project(path: ":beam-sdks-java-io-kafka", configuration: "shadow")
shadow library.java.google_api_services_bigquery
shadow library.java.jackson_core
shadow library.java.jackson_annotations
@@ -38,6 +39,7 @@ dependencies {
shadow library.java.junit
shadow library.java.hamcrest_core
shadow library.java.commons_lang3
+ shadow library.java.kafka_clients
shadow project(path: ":beam-runners-direct-java", configuration: "shadow")
shadow library.java.slf4j_jdk14
testCompile library.java.hamcrest_core
diff --git a/sdks/java/nexmark/pom.xml b/sdks/java/nexmark/pom.xml
index 613ddc70a583..4d943751675f 100644
--- a/sdks/java/nexmark/pom.xml
+++ b/sdks/java/nexmark/pom.xml
@@ -192,6 +192,15 @@
-da
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 1.8
+ 1.8
+
+
@@ -274,13 +283,13 @@
hamcrest-core
compile
-
+
org.hamcrest
hamcrest-library
compile
-
+
org.apache.beam
beam-runners-direct-java
@@ -303,5 +312,23 @@
commons-lang3
runtime
+
+
+ com.google.auto.value
+ auto-value
+ provided
+
+
+
+ org.apache.beam
+ beam-sdks-java-io-kafka
+
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.clients.version}
+
+
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index bd746ce537a8..cd24897664ac 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.nexmark;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import com.google.api.services.bigquery.model.TableFieldSchema;
@@ -42,6 +43,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
@@ -88,11 +90,16 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.slf4j.LoggerFactory;
@@ -764,6 +771,69 @@ public void processElement(ProcessContext c) {
}));
}
+ static final DoFn EVENT_TO_BYTEARRAY =
+ new DoFn() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ try {
+ byte[] encodedEvent = CoderUtils.encodeToByteArray(Event.CODER, c.element());
+ c.output(encodedEvent);
+ } catch (CoderException e1) {
+ LOG.error("Error while sending Event {} to Kafka: serialization error",
+ c.element().toString());
+ }
+ }
+ };
+
+ /**
+ * Send {@code events} to Kafka.
+ */
+ private void sinkEventsToKafka(PCollection events) {
+ PCollection eventToBytes =
+ events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY));
+ eventToBytes.apply(KafkaIO.write()
+ .withBootstrapServers(options.getBootstrapServers())
+ .withTopic(options.getKafkaSinkTopic())
+ .withValueSerializer(ByteArraySerializer.class)
+ .values());
+
+ }
+
+
+ static final DoFn, Event> BYTEARRAY_TO_EVENT =
+ new DoFn, Event>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ byte[] encodedEvent = c.element().getValue();
+ try {
+ Event event = CoderUtils.decodeFromByteArray(Event.CODER, encodedEvent);
+ c.output(event);
+ } catch (CoderException e) {
+ LOG.error("Error while decoding Event from Kafka message: serialization error");
+ }
+ }
+ };
+
+ /**
+ * Return source of events from Kafka.
+ */
+ private PCollection sourceEventsFromKafka(Pipeline p) {
+ NexmarkUtils.console("Reading events from Kafka Topic %s", options.getKafkaSourceTopic());
+
+ checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()),
+ "Missing --bootstrapServers");
+
+ KafkaIO.Read read = KafkaIO.read()
+ .withBootstrapServers(options.getBootstrapServers())
+ .withTopic(options.getKafkaSourceTopic())
+ .withKeyDeserializer(LongDeserializer.class)
+ .withValueDeserializer(ByteArrayDeserializer.class);
+
+ return p
+ .apply(queryName + ".ReadKafkaEvents", read.withoutMetadata())
+ .apply(queryName + ".KafkaToEvents", ParDo.of(BYTEARRAY_TO_EVENT));
+ }
+
/**
* Return Avro source of events from {@code options.getInputFilePrefix}.
*/
@@ -813,6 +883,22 @@ public void processElement(ProcessContext c) {
.apply(queryName + ".WritePubsubEvents", io);
}
+ /**
+ * Send {@code formattedResults} to Kafka.
+ */
+ private void sinkResultsToKafka(PCollection formattedResults) {
+ checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()),
+ "Missing --bootstrapServers");
+
+ formattedResults.apply(
+ queryName + ".WriteKafkaResults",
+ KafkaIO.write()
+ .withBootstrapServers(options.getBootstrapServers())
+ .withTopic(options.getKafkaSinkTopic())
+ .withValueSerializer(StringSerializer.class)
+ .values());
+ }
+
/**
* Send {@code formattedResults} to Pubsub.
*/
@@ -923,6 +1009,9 @@ private PCollection createSource(Pipeline p, final long now) {
case AVRO:
source = sourceEventsFromAvro(p);
break;
+ case KAFKA:
+ source = sourceEventsFromKafka(p);
+ break;
case PUBSUB:
// Setup the sink for the publisher.
switch (configuration.pubSubMode) {
@@ -1010,6 +1099,9 @@ private void sink(PCollection> results, long now) {
case PUBSUB:
sinkResultsToPubsub(formattedResults, now);
break;
+ case KAFKA:
+ sinkResultsToKafka(formattedResults);
+ break;
case TEXT:
sinkResultsToText(formattedResults, now);
break;
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
index 0c5c1c1c3685..a3386b66d2b2 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
@@ -406,4 +406,24 @@ public interface NexmarkOptions
String getQueryLanguage();
void setQueryLanguage(String value);
+
+ @Description("Base name of Kafka source topic in streaming mode.")
+ @Nullable
+ @Default.String("nexmark-source")
+ String getKafkaSourceTopic();
+
+ void setKafkaSourceTopic(String value);
+
+ @Description("Base name of Kafka sink topic in streaming mode.")
+ @Nullable
+ @Default.String("nexmark-sink")
+ String getKafkaSinkTopic();
+
+ void setKafkaSinkTopic(String value);
+
+ @Description("Kafka Bootstrap Server domains.")
+ @Nullable
+ String getBootstrapServers();
+
+ void setBootstrapServers(String value);
}
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
index 3eb6f79c3ea3..5d89dbde4393 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
@@ -99,7 +99,11 @@ public enum SourceType {
/**
* Read from a PubSub topic. It will be fed the same synthetic events by this pipeline.
*/
- PUBSUB
+ PUBSUB,
+ /**
+ * Read events from a Kafka topic. It will be fed the same synthetic events by this pipeline.
+ */
+ KAFKA
}
/**
@@ -118,6 +122,10 @@ public enum SinkType {
* Write to a PubSub topic. It will be drained by this pipeline.
*/
PUBSUB,
+ /**
+ * Write to a Kafka topic. It will be drained by this pipeline.
+ */
+ KAFKA,
/**
* Write to a text file. Only works in batch mode.
*/
@@ -129,7 +137,7 @@ public enum SinkType {
/**
* Write raw Events to BigQuery.
*/
- BIGQUERY,
+ BIGQUERY
}
/**