diff --git a/sdks/java/nexmark/pom.xml b/sdks/java/nexmark/pom.xml
index 8210ddc04747..4c5374636218 100644
--- a/sdks/java/nexmark/pom.xml
+++ b/sdks/java/nexmark/pom.xml
@@ -166,6 +166,10 @@
+
+ 0.10.1.0
+
+
org.apache.beam
@@ -261,5 +265,17 @@
auto-value
provided
+
+
+ org.apache.beam
+ beam-sdks-java-io-kafka
+
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.clients.version}
+
+
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index 550fbd2ce84a..c3807b1b96de 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.nexmark;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import com.google.api.services.bigquery.model.TableFieldSchema;
@@ -42,6 +43,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
@@ -79,13 +81,21 @@
import org.apache.beam.sdk.nexmark.queries.Query9Model;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.slf4j.LoggerFactory;
@@ -747,6 +757,68 @@ public void processElement(ProcessContext c) {
}));
}
+ static final DoFn EVENT_TO_BYTEARRAY =
+ new DoFn() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ try {
+ byte[] encodedEvent = CoderUtils.encodeToByteArray(Event.CODER, c.element());
+ c.output(encodedEvent);
+ } catch (CoderException e1) {
+ LOG.error("Error while sending Event {} to Kafka: serialization error",
+ c.element().toString());
+ }
+ }
+ };
+
+ /**
+ * Send {@code events} to Kafka.
+ */
+ private void sinkEventsToKafka(PCollection events) {
+ PTransform, PDone> io = KafkaIO.write()
+ .withBootstrapServers(options.getBootstrapServers())
+ .withTopic(options.getKafkaSinkTopic())
+ .withKeySerializer(LongSerializer.class)
+ .withValueSerializer(ByteArraySerializer.class).values();
+ events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY)).apply(io);
+ }
+
+
+ static final DoFn, Event> BYTEARRAY_TO_EVENT =
+ new DoFn, Event>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ byte[] encodedEvent = c.element().getValue();
+ try {
+ Event event = CoderUtils.decodeFromByteArray(Event.CODER, encodedEvent);
+ c.output(event);
+ } catch (CoderException e) {
+ LOG.error("Error while decoding Event from Kafka message: serialization error");
+ }
+ }
+ };
+
+ /**
+ * Return source of events from Kafka.
+ */
+ private PCollection sourceEventsFromKafka(Pipeline p) {
+ NexmarkUtils.console("Reading events from Kafka Topic %s", options.getKafkaSourceTopic());
+
+ if (Strings.isNullOrEmpty(options.getBootstrapServers())) {
+ throw new RuntimeException("Missing --bootstrapServers");
+ }
+
+ KafkaIO.Read io = KafkaIO.read()
+ .withBootstrapServers(options.getBootstrapServers())
+ .withTopic(options.getKafkaSourceTopic())
+ .withKeyDeserializer(LongDeserializer.class)
+ .withValueDeserializer(ByteArrayDeserializer.class);
+
+ return p
+ .apply(queryName + ".ReadKafkaEvents", io.withoutMetadata())
+ .apply(queryName + ".KafkaToEvents", ParDo.of(BYTEARRAY_TO_EVENT));
+ }
+
/**
* Return Avro source of events from {@code options.getInputFilePrefix}.
*/
@@ -793,6 +865,21 @@ public void processElement(ProcessContext c) {
.apply(queryName + ".WritePubsubEvents", io);
}
+ /**
+ * Send {@code formattedResults} to Kafka.
+ */
+ private void sinkResultsToKafka(PCollection formattedResults) {
+ checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()),
+ "Missing --bootstrapServers");
+
+ PTransform, PDone> io = KafkaIO.write()
+ .withBootstrapServers(options.getBootstrapServers())
+ .withTopic(options.getKafkaSinkTopic())
+ .withKeySerializer(LongSerializer.class)
+ .withValueSerializer(StringSerializer.class).values();
+ formattedResults.apply(queryName + ".WriteKafkaResults", io);
+ }
+
/**
* Send {@code formattedResults} to Pubsub.
*/
@@ -903,6 +990,9 @@ private PCollection createSource(Pipeline p, final long now) {
case AVRO:
source = sourceEventsFromAvro(p);
break;
+ case KAFKA:
+ source = sourceEventsFromKafka(p);
+ break;
case PUBSUB:
// Setup the sink for the publisher.
switch (configuration.pubSubMode) {
@@ -989,6 +1079,9 @@ private void sink(PCollection> results, long now) {
case PUBSUB:
sinkResultsToPubsub(formattedResults, now);
break;
+ case KAFKA:
+ sinkResultsToKafka(formattedResults);
+ break;
case TEXT:
sinkResultsToText(formattedResults, now);
break;
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
index 2a2a5a782a65..60c74a2f7e0f 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
@@ -400,4 +400,24 @@ public interface NexmarkOptions
Long getWatermarkValidationDelaySeconds();
void setWatermarkValidationDelaySeconds(Long value);
+
+ @Description("Base name of Kafka source topic in streaming mode.")
+ @Nullable
+ @Default.String("nexmark-source")
+ String getKafkaSourceTopic();
+
+ void setKafkaSourceTopic(String value);
+
+ @Description("Base name of Kafka sink topic in streaming mode.")
+ @Nullable
+ @Default.String("nexmark-sink")
+ String getKafkaSinkTopic();
+
+ void setKafkaSinkTopic(String value);
+
+ @Description("Kafka Bootstrap Server domains.")
+ @Nullable
+ String getBootstrapServers();
+
+ void setBootstrapServers(String value);
}
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
index fa1ef165f612..e88fa9e5f1dd 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
@@ -99,7 +99,11 @@ public enum SourceType {
/**
* Read from a PubSub topic. It will be fed the same synthetic events by this pipeline.
*/
- PUBSUB
+ PUBSUB,
+ /**
+ * Read events from a Kafka topic. It will be fed the same synthetic events by this pipeline.
+ */
+ KAFKA
}
/**
@@ -118,6 +122,10 @@ public enum SinkType {
* Write to a PubSub topic. It will be drained by this pipeline.
*/
PUBSUB,
+ /**
+ * Write to a Kafka topic. It will be drained by this pipeline.
+ */
+ KAFKA,
/**
* Write to a text file. Only works in batch mode.
*/
@@ -129,7 +137,7 @@ public enum SinkType {
/**
* Write raw Events to BigQuery.
*/
- BIGQUERY,
+ BIGQUERY
}
/**