From 243e5df54bd7704d090a429bef442d44409a34f1 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Fri, 20 Nov 2020 01:51:34 -0800 Subject: [PATCH] [BEAM-11391] Improve Nexmark Kafka support to track watermarks using commit time, allow specifying the number of partitions instead of requiring launcher to have access to kafka cluster, and removing unneeded deserialization of keys. --- .../beam/sdk/nexmark/NexmarkLauncher.java | 38 ++++++++++++++----- .../beam/sdk/nexmark/NexmarkOptions.java | 14 +++++++ 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index fd2044f652e9..52b96a4ef39e 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -104,9 +104,9 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -717,11 +717,12 @@ private void sinkEventsToKafka(PCollection events) { .withBootstrapServers(options.getBootstrapServers()) .withTopic(options.getKafkaTopic()) .withValueSerializer(ByteArraySerializer.class) + .withInputTimestamp() .values()); } - static final DoFn, Event> BYTEARRAY_TO_EVENT = - new DoFn, Event>() { + static final DoFn, Event> BYTEARRAY_TO_EVENT = + new DoFn, Event>() { @ProcessElement public void processElement(ProcessContext c) throws IOException { byte[] encodedEvent = c.element().getValue(); @@ -731,20 +732,35 @@ public void processElement(ProcessContext c) throws IOException { }; /** Return source of events from Kafka. */ - private PCollection sourceEventsFromKafka(Pipeline p, final Instant now) { + private PCollection sourceEventsFromKafka(Pipeline p, final Instant start) { checkArgument((options.getBootstrapServers() != null), "Missing --bootstrapServers"); NexmarkUtils.console("Reading events from Kafka Topic %s", options.getKafkaTopic()); - KafkaIO.Read read = - KafkaIO.read() + KafkaIO.Read read = + KafkaIO.read() .withBootstrapServers(options.getBootstrapServers()) - .withTopic(options.getKafkaTopic()) - .withKeyDeserializer(LongDeserializer.class) + .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(ByteArrayDeserializer.class) - .withStartReadTime(now) + .withStartReadTime(start) .withMaxNumRecords( options.getNumEvents() != null ? options.getNumEvents() : Long.MAX_VALUE); + if (options.getKafkaTopicCreateTimeMaxDelaySec() >= 0) { + read = + read.withCreateTime( + Duration.standardSeconds(options.getKafkaTopicCreateTimeMaxDelaySec())); + } + + if (options.getNumKafkaTopicPartitions() > 0) { + ArrayList partitionArrayList = new ArrayList<>(); + for (int i = 0; i < options.getNumKafkaTopicPartitions(); ++i) { + partitionArrayList.add(new TopicPartition(options.getKafkaTopic(), i)); + } + read = read.withTopicPartitions(partitionArrayList); + } else { + read = read.withTopic(options.getKafkaTopic()); + } + return p.apply(queryName + ".ReadKafkaEvents", read.withoutMetadata()) .apply(queryName + ".KafkaToEvents", ParDo.of(BYTEARRAY_TO_EVENT)); } @@ -802,6 +818,7 @@ private void sinkResultsToKafka(PCollection formattedResults) { .withBootstrapServers(options.getBootstrapServers()) .withTopic(options.getKafkaResultsTopic()) .withValueSerializer(StringSerializer.class) + .withInputTimestamp() .values()); } @@ -1012,7 +1029,8 @@ private PCollection createSource(Pipeline p, final Instant now) throws IO // finished. In other case. when pubSubMode=SUBSCRIBE_ONLY, now should be null and // it will be ignored. source = - sourceEventsFromKafka(p, configuration.pubSubMode == COMBINED ? now : null); + sourceEventsFromKafka( + p, configuration.pubSubMode == COMBINED ? now : Instant.EPOCH); } else { source = sourceEventsFromPubsub(p); } diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java index 31a459edf736..7fa560759c1a 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java @@ -429,6 +429,20 @@ void setPubsubMessageSerializationMethod( void setKafkaTopic(String value); + @Description( + "Number of partitions for Kafka topic in streaming mode. If unspecified, the broker will be queried for all partitions.") + int getNumKafkaTopicPartitions(); + + void setNumKafkaTopicPartitions(int value); + + @Description( + "If non-negative, events from the Kafka topic will get their timestamps from the Kafka createtime, with the maximum delay for" + + "disorder as specified.") + @Default.Integer(60) + int getKafkaTopicCreateTimeMaxDelaySec(); + + void setKafkaTopicCreateTimeMaxDelaySec(int value); + @Description("Base name of Kafka results topic in streaming mode.") @Default.String("nexmark-results") @Nullable