From 46e380c162c32f55b74e2ed383978e4043fcf055 Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Tue, 3 Oct 2017 10:57:31 -0700 Subject: [PATCH 1/6] sinkToKafka works still experiment sourceFromKafka --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +- sdks/java/nexmark/pom.xml | 26 ++++ .../beam/sdk/nexmark/NexmarkLauncher.java | 122 ++++++++++++++++++ .../beam/sdk/nexmark/NexmarkOptions.java | 13 ++ .../apache/beam/sdk/nexmark/NexmarkUtils.java | 12 +- 5 files changed, 172 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index dae4c1d4c1b5..ef081768428f 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -219,7 +219,7 @@ * strings.apply(KafkaIO.write() * .withBootstrapServers("broker_1:9092,broker_2:9092") * .withTopic("results") - * .withValueSerializer(new StringSerializer()) // just need serializer for value + * .withValueSerializer(StringSerializer.class) // just need serializer for value * .values() * ); * } diff --git a/sdks/java/nexmark/pom.xml b/sdks/java/nexmark/pom.xml index 67628a128316..88dbd0649740 100644 --- a/sdks/java/nexmark/pom.xml +++ b/sdks/java/nexmark/pom.xml @@ -165,6 +165,14 @@ + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + @@ -257,5 +265,23 @@ beam-runners-direct-java test + + + org.apache.beam + beam-sdks-java-io-kafka + + + org.apache.kafka + kafka-clients + + + + + + org.apache.kafka + kafka-clients + 0.10.1.0 + + diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index 550fbd2ce84a..d6896a7b398f 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -37,11 +37,13 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricQueryResults; @@ -79,13 +81,23 @@ import org.apache.beam.sdk.nexmark.queries.Query9Model; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; import org.slf4j.LoggerFactory; @@ -747,6 +759,96 @@ public void processElement(ProcessContext c) { })); } +// DoFn eventProcess = new DoFn(){ +// @ProcessElement +// public void processElement(ProcessContext c) { +// c.output(c.element().toString()); +// } +// }; + + /** + * Send {@code events} to Kafka. + */ + private void sinkEventsToKafka(PCollection events) { +// PTransform, PDone> io = KafkaIO.write() +// .withBootstrapServers("localhost:9092") +// .withTopic("generateEvent") +// .withKeySerializer(LongSerializer.class) +// .withValueSerializer(ByteArraySerializer.class).values(); + PTransform, PDone> io = KafkaIO.write() + .withBootstrapServers("localhost:9092") + .withTopic("generateEvent") + .withKeySerializer(LongSerializer.class) + .withValueSerializer(StringSerializer.class).values(); + TypeDescriptor td = new TypeDescriptor(){}; + + +// SerializableFunction f = new SerializableFunction() { +// @Override +// public String apply(Event element) { +// return element.toString(); +// } +// }; + +// SerializableFunction f = new SerializableFunction() { +// @Override +// public String apply(Event input) { +// return input.toString(); +// } +// }; +// +// events.apply(MapElements.into(TypeDescriptors.strings()).via(f +// )).apply(io); + +// events.apply(ParDo.of(eventProcess)).getPipeline() +// .apply(queryName + ".WriteKafkaEvents", io); + + events.apply( + MapElements.into(td).via(new SimpleFunction() { + @Override + public String apply(Event input) { + return input.toString(); + } + }) + ).setCoder(StringUtf8Coder.of()).apply(io); + } + + /** + * Return source of events from Kafka. + */ + private PCollection sourceEventsFromKafka(Pipeline p) { + PCollection e = sourceEventsFromSynthetic(p); + sinkEventsToKafka(e); + + NexmarkUtils.console("%s", "Here ->>>>>"); + String filename = options.getInputPath(); + NexmarkUtils.console("Reading events from Kafka %s", filename); + + if (Strings.isNullOrEmpty(options.getBootstrapServers())) { + throw new RuntimeException("Missing --bootstrapServers"); + } + + KafkaIO.Read io = KafkaIO.read() + .withBootstrapServers(options.getBootstrapServers()) + .withTopic("generateEvent") + .withKeyDeserializer(LongDeserializer.class) + .withValueDeserializer(ByteArrayDeserializer.class); + return p + .apply(queryName + ".ReadKafkaEvents", io.withoutMetadata()) + .apply(queryName + ".KafkaToEvents", ParDo.of(new DoFn, Event>() { + @ProcessElement + public void processElement(ProcessContext c) { + byte[] payload = c.element().getValue(); + try { + Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload); + c.output(event); + } catch (CoderException e) { + LOG.error("Error while decoding Event from Kafka message: serialization error"); + } + } + })); + } + /** * Return Avro source of events from {@code options.getInputFilePrefix}. */ @@ -793,6 +895,19 @@ public void processElement(ProcessContext c) { .apply(queryName + ".WritePubsubEvents", io); } + /** + * Send {@code formattedResults} to Kafka. + */ + private void sinkResultsToKafka(PCollection formattedResults) { + PTransform, PDone> io = KafkaIO.write() + .withBootstrapServers("localhost:9092") + .withTopic("writeToTest") + .withKeySerializer(LongSerializer.class) + .withValueSerializer(StringSerializer.class).values(); + formattedResults + .apply(queryName + ".WriteKafkaResults", io); + } + /** * Send {@code formattedResults} to Pubsub. */ @@ -899,10 +1014,14 @@ private PCollection createSource(Pipeline p, final long now) { switch (configuration.sourceType) { case DIRECT: source = sourceEventsFromSynthetic(p); + sinkEventsToKafka(source); break; case AVRO: source = sourceEventsFromAvro(p); break; + case KAFKA: + source = sourceEventsFromKafka(p); + break; case PUBSUB: // Setup the sink for the publisher. switch (configuration.pubSubMode) { @@ -989,6 +1108,9 @@ private void sink(PCollection> results, long now) { case PUBSUB: sinkResultsToPubsub(formattedResults, now); break; + case KAFKA: + sinkResultsToKafka(formattedResults); + break; case TEXT: sinkResultsToText(formattedResults, now); break; diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java index 2a2a5a782a65..77aa085d288d 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java @@ -400,4 +400,17 @@ public interface NexmarkOptions Long getWatermarkValidationDelaySeconds(); void setWatermarkValidationDelaySeconds(Long value); + + @Description("Base name of Kafka topic in streaming mode.") + @Nullable + @Default.String("nexmark") + String getKafkaTopic(); + + void setKafkaTopic(String value); + + @Description("Kafka Bootstrap Server domains.") + @Nullable + String getBootstrapServers(); + + void setBootstrapServers(String value); } diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java index fa1ef165f612..e88fa9e5f1dd 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java @@ -99,7 +99,11 @@ public enum SourceType { /** * Read from a PubSub topic. It will be fed the same synthetic events by this pipeline. */ - PUBSUB + PUBSUB, + /** + * Read events from a Kafka topic. It will be fed the same synthetic events by this pipeline. + */ + KAFKA } /** @@ -118,6 +122,10 @@ public enum SinkType { * Write to a PubSub topic. It will be drained by this pipeline. */ PUBSUB, + /** + * Write to a Kafka topic. It will be drained by this pipeline. + */ + KAFKA, /** * Write to a text file. Only works in batch mode. */ @@ -129,7 +137,7 @@ public enum SinkType { /** * Write raw Events to BigQuery. */ - BIGQUERY, + BIGQUERY } /** From 18128d29bb25db4a4381509fd19a1df5652cb395 Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Tue, 3 Oct 2017 14:21:58 -0700 Subject: [PATCH 2/6] source event from kafka --- sdks/java/nexmark/pom.xml | 6 +- .../beam/sdk/nexmark/NexmarkLauncher.java | 106 ++++++++---------- 2 files changed, 51 insertions(+), 61 deletions(-) diff --git a/sdks/java/nexmark/pom.xml b/sdks/java/nexmark/pom.xml index 88dbd0649740..e3b13cb625d9 100644 --- a/sdks/java/nexmark/pom.xml +++ b/sdks/java/nexmark/pom.xml @@ -176,6 +176,10 @@ + + 0.10.1.0 + + org.apache.beam @@ -280,7 +284,7 @@ org.apache.kafka kafka-clients - 0.10.1.0 + ${kafka.clients.version} diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index d6896a7b398f..9bf1851a75cf 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -37,7 +37,6 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; @@ -81,10 +80,8 @@ import org.apache.beam.sdk.nexmark.queries.Query9Model; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -95,6 +92,7 @@ import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -759,60 +757,60 @@ public void processElement(ProcessContext c) { })); } -// DoFn eventProcess = new DoFn(){ -// @ProcessElement -// public void processElement(ProcessContext c) { -// c.output(c.element().toString()); -// } -// }; + static final ParDo.SingleOutput EVENT_TO_BYTEARRAY = + ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + try { + byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element()); + c.output(payload); + } catch (CoderException e1) { + LOG.error("Error while sending Event {} to Kafka: serialization error", + c.element().toString()); + } + } + }); /** * Send {@code events} to Kafka. */ private void sinkEventsToKafka(PCollection events) { -// PTransform, PDone> io = KafkaIO.write() -// .withBootstrapServers("localhost:9092") -// .withTopic("generateEvent") -// .withKeySerializer(LongSerializer.class) -// .withValueSerializer(ByteArraySerializer.class).values(); - PTransform, PDone> io = KafkaIO.write() + PTransform, PDone> io = KafkaIO.write() .withBootstrapServers("localhost:9092") .withTopic("generateEvent") .withKeySerializer(LongSerializer.class) - .withValueSerializer(StringSerializer.class).values(); + .withValueSerializer(ByteArraySerializer.class).values(); +// PTransform, PDone> io = KafkaIO.write() +// .withBootstrapServers("localhost:9092") +// .withTopic("generateEvent") +// .withKeySerializer(LongSerializer.class) +// .withValueSerializer(StringSerializer.class).values(); TypeDescriptor td = new TypeDescriptor(){}; -// SerializableFunction f = new SerializableFunction() { -// @Override -// public String apply(Event element) { -// return element.toString(); -// } -// }; - -// SerializableFunction f = new SerializableFunction() { -// @Override -// public String apply(Event input) { -// return input.toString(); -// } -// }; -// -// events.apply(MapElements.into(TypeDescriptors.strings()).via(f -// )).apply(io); - -// events.apply(ParDo.of(eventProcess)).getPipeline() -// .apply(queryName + ".WriteKafkaEvents", io); - - events.apply( - MapElements.into(td).via(new SimpleFunction() { - @Override - public String apply(Event input) { - return input.toString(); - } - }) - ).setCoder(StringUtf8Coder.of()).apply(io); + +// events.apply( +// MapElements.into(td) +// .via((Event e) -> Bytes.wrap(e.toString().getBytes())) +// ).apply(io); + events.apply("Event To String", EVENT_TO_BYTEARRAY).apply(io); } + + static final ParDo.SingleOutput, Event> BYTEARRAY_TO_EVENT = + ParDo.of(new DoFn, Event>() { + @ProcessElement + public void processElement(ProcessContext c) { + byte[] payload = c.element().getValue(); + try { + Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload); + c.output(event); + } catch (CoderException e) { + LOG.error("Error while decoding Event from Kafka message: serialization error"); + } + } + }); + /** * Return source of events from Kafka. */ @@ -835,18 +833,7 @@ private PCollection sourceEventsFromKafka(Pipeline p) { .withValueDeserializer(ByteArrayDeserializer.class); return p .apply(queryName + ".ReadKafkaEvents", io.withoutMetadata()) - .apply(queryName + ".KafkaToEvents", ParDo.of(new DoFn, Event>() { - @ProcessElement - public void processElement(ProcessContext c) { - byte[] payload = c.element().getValue(); - try { - Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload); - c.output(event); - } catch (CoderException e) { - LOG.error("Error while decoding Event from Kafka message: serialization error"); - } - } - })); + .apply(queryName + ".KafkaToEvents", BYTEARRAY_TO_EVENT); } /** @@ -900,12 +887,11 @@ public void processElement(ProcessContext c) { */ private void sinkResultsToKafka(PCollection formattedResults) { PTransform, PDone> io = KafkaIO.write() - .withBootstrapServers("localhost:9092") - .withTopic("writeToTest") + .withBootstrapServers(options.getBootstrapServers()) + .withTopic(options.getKafkaTopic()) .withKeySerializer(LongSerializer.class) .withValueSerializer(StringSerializer.class).values(); - formattedResults - .apply(queryName + ".WriteKafkaResults", io); + formattedResults.apply(queryName + ".WriteKafkaResults", io); } /** From d603dcdf1c4ad6e696eb0fec60e2d6fd31322234 Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Tue, 3 Oct 2017 14:38:02 -0700 Subject: [PATCH 3/6] clean code --- sdks/java/nexmark/pom.xml | 8 --- .../beam/sdk/nexmark/NexmarkLauncher.java | 49 +++++++------------ .../beam/sdk/nexmark/NexmarkOptions.java | 15 ++++-- 3 files changed, 30 insertions(+), 42 deletions(-) diff --git a/sdks/java/nexmark/pom.xml b/sdks/java/nexmark/pom.xml index e3b13cb625d9..d896e8f35615 100644 --- a/sdks/java/nexmark/pom.xml +++ b/sdks/java/nexmark/pom.xml @@ -165,14 +165,6 @@ - - org.apache.maven.plugins - maven-compiler-plugin - - 1.8 - 1.8 - - diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index 9bf1851a75cf..7311ddba76ec 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -90,7 +90,6 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.LongDeserializer; @@ -762,8 +761,8 @@ public void processElement(ProcessContext c) { @ProcessElement public void processElement(ProcessContext c) { try { - byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element()); - c.output(payload); + byte[] encodedEvent = CoderUtils.encodeToByteArray(Event.CODER, c.element()); + c.output(encodedEvent); } catch (CoderException e1) { LOG.error("Error while sending Event {} to Kafka: serialization error", c.element().toString()); @@ -775,25 +774,16 @@ public void processElement(ProcessContext c) { * Send {@code events} to Kafka. */ private void sinkEventsToKafka(PCollection events) { + if (Strings.isNullOrEmpty(options.getBootstrapServers())) { + throw new RuntimeException("Missing --bootstrapServers"); + } + PTransform, PDone> io = KafkaIO.write() - .withBootstrapServers("localhost:9092") - .withTopic("generateEvent") + .withBootstrapServers(options.getBootstrapServers()) + .withTopic(options.getKafkaSinkTopic()) .withKeySerializer(LongSerializer.class) .withValueSerializer(ByteArraySerializer.class).values(); -// PTransform, PDone> io = KafkaIO.write() -// .withBootstrapServers("localhost:9092") -// .withTopic("generateEvent") -// .withKeySerializer(LongSerializer.class) -// .withValueSerializer(StringSerializer.class).values(); - TypeDescriptor td = new TypeDescriptor(){}; - - - -// events.apply( -// MapElements.into(td) -// .via((Event e) -> Bytes.wrap(e.toString().getBytes())) -// ).apply(io); - events.apply("Event To String", EVENT_TO_BYTEARRAY).apply(io); + events.apply("Event to bytes", EVENT_TO_BYTEARRAY).apply(io); } @@ -801,9 +791,9 @@ private void sinkEventsToKafka(PCollection events) { ParDo.of(new DoFn, Event>() { @ProcessElement public void processElement(ProcessContext c) { - byte[] payload = c.element().getValue(); + byte[] encodedEvent = c.element().getValue(); try { - Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload); + Event event = CoderUtils.decodeFromByteArray(Event.CODER, encodedEvent); c.output(event); } catch (CoderException e) { LOG.error("Error while decoding Event from Kafka message: serialization error"); @@ -815,12 +805,7 @@ public void processElement(ProcessContext c) { * Return source of events from Kafka. */ private PCollection sourceEventsFromKafka(Pipeline p) { - PCollection e = sourceEventsFromSynthetic(p); - sinkEventsToKafka(e); - - NexmarkUtils.console("%s", "Here ->>>>>"); - String filename = options.getInputPath(); - NexmarkUtils.console("Reading events from Kafka %s", filename); + NexmarkUtils.console("Reading events from Kafka Topic %s", options.getKafkaSourceTopic()); if (Strings.isNullOrEmpty(options.getBootstrapServers())) { throw new RuntimeException("Missing --bootstrapServers"); @@ -828,9 +813,10 @@ private PCollection sourceEventsFromKafka(Pipeline p) { KafkaIO.Read io = KafkaIO.read() .withBootstrapServers(options.getBootstrapServers()) - .withTopic("generateEvent") + .withTopic(options.getKafkaSourceTopic()) .withKeyDeserializer(LongDeserializer.class) .withValueDeserializer(ByteArrayDeserializer.class); + return p .apply(queryName + ".ReadKafkaEvents", io.withoutMetadata()) .apply(queryName + ".KafkaToEvents", BYTEARRAY_TO_EVENT); @@ -886,9 +872,13 @@ public void processElement(ProcessContext c) { * Send {@code formattedResults} to Kafka. */ private void sinkResultsToKafka(PCollection formattedResults) { + if (Strings.isNullOrEmpty(options.getBootstrapServers())) { + throw new RuntimeException("Missing --bootstrapServers"); + } + PTransform, PDone> io = KafkaIO.write() .withBootstrapServers(options.getBootstrapServers()) - .withTopic(options.getKafkaTopic()) + .withTopic(options.getKafkaSinkTopic()) .withKeySerializer(LongSerializer.class) .withValueSerializer(StringSerializer.class).values(); formattedResults.apply(queryName + ".WriteKafkaResults", io); @@ -1000,7 +990,6 @@ private PCollection createSource(Pipeline p, final long now) { switch (configuration.sourceType) { case DIRECT: source = sourceEventsFromSynthetic(p); - sinkEventsToKafka(source); break; case AVRO: source = sourceEventsFromAvro(p); diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java index 77aa085d288d..60c74a2f7e0f 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java @@ -401,12 +401,19 @@ public interface NexmarkOptions void setWatermarkValidationDelaySeconds(Long value); - @Description("Base name of Kafka topic in streaming mode.") + @Description("Base name of Kafka source topic in streaming mode.") @Nullable - @Default.String("nexmark") - String getKafkaTopic(); + @Default.String("nexmark-source") + String getKafkaSourceTopic(); + + void setKafkaSourceTopic(String value); + + @Description("Base name of Kafka sink topic in streaming mode.") + @Nullable + @Default.String("nexmark-sink") + String getKafkaSinkTopic(); - void setKafkaTopic(String value); + void setKafkaSinkTopic(String value); @Description("Kafka Bootstrap Server domains.") @Nullable From d38d956657fcec95fb0c5eb17eae1258a64c15f8 Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Tue, 17 Oct 2017 03:24:09 -0700 Subject: [PATCH 4/6] address comments --- sdks/java/nexmark/pom.xml | 6 ------ .../java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java | 8 ++++---- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/sdks/java/nexmark/pom.xml b/sdks/java/nexmark/pom.xml index d896e8f35615..73f52ebc8639 100644 --- a/sdks/java/nexmark/pom.xml +++ b/sdks/java/nexmark/pom.xml @@ -265,12 +265,6 @@ org.apache.beam beam-sdks-java-io-kafka - - - org.apache.kafka - kafka-clients - - diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index 7311ddba76ec..6160c04a7c98 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -756,8 +756,8 @@ public void processElement(ProcessContext c) { })); } - static final ParDo.SingleOutput EVENT_TO_BYTEARRAY = - ParDo.of(new DoFn() { + static final DoFn EVENT_TO_BYTEARRAY = + new DoFn() { @ProcessElement public void processElement(ProcessContext c) { try { @@ -768,7 +768,7 @@ public void processElement(ProcessContext c) { c.element().toString()); } } - }); + }; /** * Send {@code events} to Kafka. @@ -783,7 +783,7 @@ private void sinkEventsToKafka(PCollection events) { .withTopic(options.getKafkaSinkTopic()) .withKeySerializer(LongSerializer.class) .withValueSerializer(ByteArraySerializer.class).values(); - events.apply("Event to bytes", EVENT_TO_BYTEARRAY).apply(io); + events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY)).apply(io); } From f2ad04f423dc73a5200d500f7d976c3478db7b81 Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Thu, 9 Nov 2017 14:52:47 -0800 Subject: [PATCH 5/6] address comments 2 --- sdks/java/nexmark/pom.xml | 6 ++++++ .../beam/sdk/nexmark/NexmarkLauncher.java | 18 +++++++----------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/sdks/java/nexmark/pom.xml b/sdks/java/nexmark/pom.xml index 73f52ebc8639..b60fafd4c076 100644 --- a/sdks/java/nexmark/pom.xml +++ b/sdks/java/nexmark/pom.xml @@ -262,6 +262,12 @@ test + + com.google.auto.value + auto-value + provided + + org.apache.beam beam-sdks-java-io-kafka diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index 6160c04a7c98..cdf9c2a14933 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.nexmark; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import com.google.api.services.bigquery.model.TableFieldSchema; @@ -774,10 +775,6 @@ public void processElement(ProcessContext c) { * Send {@code events} to Kafka. */ private void sinkEventsToKafka(PCollection events) { - if (Strings.isNullOrEmpty(options.getBootstrapServers())) { - throw new RuntimeException("Missing --bootstrapServers"); - } - PTransform, PDone> io = KafkaIO.write() .withBootstrapServers(options.getBootstrapServers()) .withTopic(options.getKafkaSinkTopic()) @@ -787,8 +784,8 @@ private void sinkEventsToKafka(PCollection events) { } - static final ParDo.SingleOutput, Event> BYTEARRAY_TO_EVENT = - ParDo.of(new DoFn, Event>() { + static final DoFn, Event> BYTEARRAY_TO_EVENT = + new DoFn, Event>() { @ProcessElement public void processElement(ProcessContext c) { byte[] encodedEvent = c.element().getValue(); @@ -799,7 +796,7 @@ public void processElement(ProcessContext c) { LOG.error("Error while decoding Event from Kafka message: serialization error"); } } - }); + }; /** * Return source of events from Kafka. @@ -819,7 +816,7 @@ private PCollection sourceEventsFromKafka(Pipeline p) { return p .apply(queryName + ".ReadKafkaEvents", io.withoutMetadata()) - .apply(queryName + ".KafkaToEvents", BYTEARRAY_TO_EVENT); + .apply(queryName + ".KafkaToEvents", ParDo.of(BYTEARRAY_TO_EVENT)); } /** @@ -872,9 +869,8 @@ public void processElement(ProcessContext c) { * Send {@code formattedResults} to Kafka. */ private void sinkResultsToKafka(PCollection formattedResults) { - if (Strings.isNullOrEmpty(options.getBootstrapServers())) { - throw new RuntimeException("Missing --bootstrapServers"); - } + checkArgument(Strings.isNullOrEmpty(options.getBootstrapServers()), + "Missing --bootstrapServers"); PTransform, PDone> io = KafkaIO.write() .withBootstrapServers(options.getBootstrapServers()) From d246c411276a91b4fe9102e9d7793ee2e5df7f9e Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Thu, 9 Nov 2017 22:36:24 -0800 Subject: [PATCH 6/6] changes --- .../main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index cdf9c2a14933..c3807b1b96de 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -869,7 +869,7 @@ public void processElement(ProcessContext c) { * Send {@code formattedResults} to Kafka. */ private void sinkResultsToKafka(PCollection formattedResults) { - checkArgument(Strings.isNullOrEmpty(options.getBootstrapServers()), + checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()), "Missing --bootstrapServers"); PTransform, PDone> io = KafkaIO.write()