From 97d32bdfd351a4e6b542242d31f92a310b44fae5 Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Wed, 4 Apr 2018 15:06:14 +0200 Subject: [PATCH 1/3] [BEAM-2852] Added Kafka source of events and sink of results --- sdks/java/nexmark/pom.xml | 35 ++++++- .../beam/sdk/nexmark/NexmarkLauncher.java | 93 +++++++++++++++++++ .../beam/sdk/nexmark/NexmarkOptions.java | 20 ++++ .../apache/beam/sdk/nexmark/NexmarkUtils.java | 12 ++- 4 files changed, 156 insertions(+), 4 deletions(-) diff --git a/sdks/java/nexmark/pom.xml b/sdks/java/nexmark/pom.xml index 613ddc70a583..bc1113839059 100644 --- a/sdks/java/nexmark/pom.xml +++ b/sdks/java/nexmark/pom.xml @@ -192,9 +192,22 @@ -da + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + 0.10.1.0 + + org.apache.beam @@ -274,13 +287,13 @@ hamcrest-core compile - + org.hamcrest hamcrest-library compile - + org.apache.beam beam-runners-direct-java @@ -303,5 +316,23 @@ commons-lang3 runtime + + + com.google.auto.value + auto-value + provided + + + + org.apache.beam + beam-sdks-java-io-kafka + + + + org.apache.kafka + kafka-clients + ${kafka.clients.version} + + diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index bd746ce537a8..cb8820f62a27 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.nexmark; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import com.google.api.services.bigquery.model.TableFieldSchema; @@ -42,6 +43,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricQueryResults; @@ -86,13 +88,21 @@ import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery7; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; import org.slf4j.LoggerFactory; @@ -764,6 +774,68 @@ public void processElement(ProcessContext c) { })); } + static final DoFn EVENT_TO_BYTEARRAY = + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + try { + byte[] encodedEvent = CoderUtils.encodeToByteArray(Event.CODER, c.element()); + c.output(encodedEvent); + } catch (CoderException e1) { + LOG.error("Error while sending Event {} to Kafka: serialization error", + c.element().toString()); + } + } + }; + + /** + * Send {@code events} to Kafka. + */ + private void sinkEventsToKafka(PCollection events) { + PTransform, PDone> io = KafkaIO.write() + .withBootstrapServers(options.getBootstrapServers()) + .withTopic(options.getKafkaSinkTopic()) + .withKeySerializer(LongSerializer.class) + .withValueSerializer(ByteArraySerializer.class).values(); + events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY)).apply(io); + } + + + static final DoFn, Event> BYTEARRAY_TO_EVENT = + new DoFn, Event>() { + @ProcessElement + public void processElement(ProcessContext c) { + byte[] encodedEvent = c.element().getValue(); + try { + Event event = CoderUtils.decodeFromByteArray(Event.CODER, encodedEvent); + c.output(event); + } catch (CoderException e) { + LOG.error("Error while decoding Event from Kafka message: serialization error"); + } + } + }; + + /** + * Return source of events from Kafka. + */ + private PCollection sourceEventsFromKafka(Pipeline p) { + NexmarkUtils.console("Reading events from Kafka Topic %s", options.getKafkaSourceTopic()); + + if (Strings.isNullOrEmpty(options.getBootstrapServers())) { + throw new RuntimeException("Missing --bootstrapServers"); + } + + KafkaIO.Read io = KafkaIO.read() + .withBootstrapServers(options.getBootstrapServers()) + .withTopic(options.getKafkaSourceTopic()) + .withKeyDeserializer(LongDeserializer.class) + .withValueDeserializer(ByteArrayDeserializer.class); + + return p + .apply(queryName + ".ReadKafkaEvents", io.withoutMetadata()) + .apply(queryName + ".KafkaToEvents", ParDo.of(BYTEARRAY_TO_EVENT)); + } + /** * Return Avro source of events from {@code options.getInputFilePrefix}. */ @@ -813,6 +885,21 @@ public void processElement(ProcessContext c) { .apply(queryName + ".WritePubsubEvents", io); } + /** + * Send {@code formattedResults} to Kafka. + */ + private void sinkResultsToKafka(PCollection formattedResults) { + checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()), + "Missing --bootstrapServers"); + + PTransform, PDone> io = KafkaIO.write() + .withBootstrapServers(options.getBootstrapServers()) + .withTopic(options.getKafkaSinkTopic()) + .withKeySerializer(LongSerializer.class) + .withValueSerializer(StringSerializer.class).values(); + formattedResults.apply(queryName + ".WriteKafkaResults", io); + } + /** * Send {@code formattedResults} to Pubsub. */ @@ -923,6 +1010,9 @@ private PCollection createSource(Pipeline p, final long now) { case AVRO: source = sourceEventsFromAvro(p); break; + case KAFKA: + source = sourceEventsFromKafka(p); + break; case PUBSUB: // Setup the sink for the publisher. switch (configuration.pubSubMode) { @@ -1010,6 +1100,9 @@ private void sink(PCollection> results, long now) { case PUBSUB: sinkResultsToPubsub(formattedResults, now); break; + case KAFKA: + sinkResultsToKafka(formattedResults); + break; case TEXT: sinkResultsToText(formattedResults, now); break; diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java index 0c5c1c1c3685..a3386b66d2b2 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java @@ -406,4 +406,24 @@ public interface NexmarkOptions String getQueryLanguage(); void setQueryLanguage(String value); + + @Description("Base name of Kafka source topic in streaming mode.") + @Nullable + @Default.String("nexmark-source") + String getKafkaSourceTopic(); + + void setKafkaSourceTopic(String value); + + @Description("Base name of Kafka sink topic in streaming mode.") + @Nullable + @Default.String("nexmark-sink") + String getKafkaSinkTopic(); + + void setKafkaSinkTopic(String value); + + @Description("Kafka Bootstrap Server domains.") + @Nullable + String getBootstrapServers(); + + void setBootstrapServers(String value); } diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java index 3eb6f79c3ea3..5d89dbde4393 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java @@ -99,7 +99,11 @@ public enum SourceType { /** * Read from a PubSub topic. It will be fed the same synthetic events by this pipeline. */ - PUBSUB + PUBSUB, + /** + * Read events from a Kafka topic. It will be fed the same synthetic events by this pipeline. + */ + KAFKA } /** @@ -118,6 +122,10 @@ public enum SinkType { * Write to a PubSub topic. It will be drained by this pipeline. */ PUBSUB, + /** + * Write to a Kafka topic. It will be drained by this pipeline. + */ + KAFKA, /** * Write to a text file. Only works in batch mode. */ @@ -129,7 +137,7 @@ public enum SinkType { /** * Write raw Events to BigQuery. */ - BIGQUERY, + BIGQUERY } /** From 5123108df080b7f3921df03b6ca8e1bf82deba1a Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Wed, 4 Apr 2018 17:03:32 +0200 Subject: [PATCH 2/3] [BEAM-2852] Fix the code that uses Long key whereas there is no key in the PCollection --- .../beam/sdk/nexmark/NexmarkLauncher.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index cb8820f62a27..172bdc26bfd8 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -88,20 +88,17 @@ import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery7; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; import org.slf4j.LoggerFactory; @@ -792,12 +789,14 @@ public void processElement(ProcessContext c) { * Send {@code events} to Kafka. */ private void sinkEventsToKafka(PCollection events) { - PTransform, PDone> io = KafkaIO.write() - .withBootstrapServers(options.getBootstrapServers()) - .withTopic(options.getKafkaSinkTopic()) - .withKeySerializer(LongSerializer.class) - .withValueSerializer(ByteArraySerializer.class).values(); - events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY)).apply(io); + PCollection eventToBytes = + events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY)); + eventToBytes.apply(KafkaIO.write() + .withBootstrapServers(options.getBootstrapServers()) + .withTopic(options.getKafkaSinkTopic()) + .withValueSerializer(ByteArraySerializer.class) + .values()); + } @@ -892,12 +891,13 @@ private void sinkResultsToKafka(PCollection formattedResults) { checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()), "Missing --bootstrapServers"); - PTransform, PDone> io = KafkaIO.write() + formattedResults.apply( + queryName + ".WriteKafkaResults", + KafkaIO.write() .withBootstrapServers(options.getBootstrapServers()) .withTopic(options.getKafkaSinkTopic()) - .withKeySerializer(LongSerializer.class) - .withValueSerializer(StringSerializer.class).values(); - formattedResults.apply(queryName + ".WriteKafkaResults", io); + .withValueSerializer(StringSerializer.class) + .values()); } /** From 6df0adde868a559f947865391266f30bdaeac899 Mon Sep 17 00:00:00 2001 From: Alexey Romanenko Date: Fri, 6 Apr 2018 11:24:01 +0200 Subject: [PATCH 3/3] [BEAM-2852] Fix minor code style issues [BEAM-2852] Adjust gradle build with maven --- sdks/java/nexmark/build.gradle | 2 ++ sdks/java/nexmark/pom.xml | 4 ---- .../org/apache/beam/sdk/nexmark/NexmarkLauncher.java | 9 ++++----- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/sdks/java/nexmark/build.gradle b/sdks/java/nexmark/build.gradle index b8f354d7af50..83a3d05f99d4 100644 --- a/sdks/java/nexmark/build.gradle +++ b/sdks/java/nexmark/build.gradle @@ -27,6 +27,7 @@ dependencies { shadow project(path: ":beam-sdks-java-io-google-cloud-platform", configuration: "shadow") shadow project(path: ":beam-sdks-java-extensions-google-cloud-platform-core", configuration: "shadow") shadow project(path: ":beam-sdks-java-extensions-sql", configuration: "shadow") + shadow project(path: ":beam-sdks-java-io-kafka", configuration: "shadow") shadow library.java.google_api_services_bigquery shadow library.java.jackson_core shadow library.java.jackson_annotations @@ -38,6 +39,7 @@ dependencies { shadow library.java.junit shadow library.java.hamcrest_core shadow library.java.commons_lang3 + shadow library.java.kafka_clients shadow project(path: ":beam-runners-direct-java", configuration: "shadow") shadow library.java.slf4j_jdk14 testCompile library.java.hamcrest_core diff --git a/sdks/java/nexmark/pom.xml b/sdks/java/nexmark/pom.xml index bc1113839059..4d943751675f 100644 --- a/sdks/java/nexmark/pom.xml +++ b/sdks/java/nexmark/pom.xml @@ -204,10 +204,6 @@ - - 0.10.1.0 - - org.apache.beam diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index 172bdc26bfd8..cd24897664ac 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -820,18 +820,17 @@ public void processElement(ProcessContext c) { private PCollection sourceEventsFromKafka(Pipeline p) { NexmarkUtils.console("Reading events from Kafka Topic %s", options.getKafkaSourceTopic()); - if (Strings.isNullOrEmpty(options.getBootstrapServers())) { - throw new RuntimeException("Missing --bootstrapServers"); - } + checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()), + "Missing --bootstrapServers"); - KafkaIO.Read io = KafkaIO.read() + KafkaIO.Read read = KafkaIO.read() .withBootstrapServers(options.getBootstrapServers()) .withTopic(options.getKafkaSourceTopic()) .withKeyDeserializer(LongDeserializer.class) .withValueDeserializer(ByteArrayDeserializer.class); return p - .apply(queryName + ".ReadKafkaEvents", io.withoutMetadata()) + .apply(queryName + ".ReadKafkaEvents", read.withoutMetadata()) .apply(queryName + ".KafkaToEvents", ParDo.of(BYTEARRAY_TO_EVENT)); }