diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index fd2044f652e9..52b96a4ef39e 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -104,9 +104,9 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -717,11 +717,12 @@ private void sinkEventsToKafka(PCollection events) { .withBootstrapServers(options.getBootstrapServers()) .withTopic(options.getKafkaTopic()) .withValueSerializer(ByteArraySerializer.class) + .withInputTimestamp() .values()); } - static final DoFn, Event> BYTEARRAY_TO_EVENT = - new DoFn, Event>() { + static final DoFn, Event> BYTEARRAY_TO_EVENT = + new DoFn, Event>() { @ProcessElement public void processElement(ProcessContext c) throws IOException { byte[] encodedEvent = c.element().getValue(); @@ -731,20 +732,35 @@ public void processElement(ProcessContext c) throws IOException { }; /** Return source of events from Kafka. */ - private PCollection sourceEventsFromKafka(Pipeline p, final Instant now) { + private PCollection sourceEventsFromKafka(Pipeline p, final Instant start) { checkArgument((options.getBootstrapServers() != null), "Missing --bootstrapServers"); NexmarkUtils.console("Reading events from Kafka Topic %s", options.getKafkaTopic()); - KafkaIO.Read read = - KafkaIO.read() + KafkaIO.Read read = + KafkaIO.read() .withBootstrapServers(options.getBootstrapServers()) - .withTopic(options.getKafkaTopic()) - .withKeyDeserializer(LongDeserializer.class) + .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(ByteArrayDeserializer.class) - .withStartReadTime(now) + .withStartReadTime(start) .withMaxNumRecords( options.getNumEvents() != null ? options.getNumEvents() : Long.MAX_VALUE); + if (options.getKafkaTopicCreateTimeMaxDelaySec() >= 0) { + read = + read.withCreateTime( + Duration.standardSeconds(options.getKafkaTopicCreateTimeMaxDelaySec())); + } + + if (options.getNumKafkaTopicPartitions() > 0) { + ArrayList partitionArrayList = new ArrayList<>(); + for (int i = 0; i < options.getNumKafkaTopicPartitions(); ++i) { + partitionArrayList.add(new TopicPartition(options.getKafkaTopic(), i)); + } + read = read.withTopicPartitions(partitionArrayList); + } else { + read = read.withTopic(options.getKafkaTopic()); + } + return p.apply(queryName + ".ReadKafkaEvents", read.withoutMetadata()) .apply(queryName + ".KafkaToEvents", ParDo.of(BYTEARRAY_TO_EVENT)); } @@ -802,6 +818,7 @@ private void sinkResultsToKafka(PCollection formattedResults) { .withBootstrapServers(options.getBootstrapServers()) .withTopic(options.getKafkaResultsTopic()) .withValueSerializer(StringSerializer.class) + .withInputTimestamp() .values()); } @@ -1012,7 +1029,8 @@ private PCollection createSource(Pipeline p, final Instant now) throws IO // finished. In other case. when pubSubMode=SUBSCRIBE_ONLY, now should be null and // it will be ignored. source = - sourceEventsFromKafka(p, configuration.pubSubMode == COMBINED ? now : null); + sourceEventsFromKafka( + p, configuration.pubSubMode == COMBINED ? now : Instant.EPOCH); } else { source = sourceEventsFromPubsub(p); } diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java index 31a459edf736..7fa560759c1a 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java @@ -429,6 +429,20 @@ void setPubsubMessageSerializationMethod( void setKafkaTopic(String value); + @Description( + "Number of partitions for Kafka topic in streaming mode. If unspecified, the broker will be queried for all partitions.") + int getNumKafkaTopicPartitions(); + + void setNumKafkaTopicPartitions(int value); + + @Description( + "If non-negative, events from the Kafka topic will get their timestamps from the Kafka createtime, with the maximum delay for" + + "disorder as specified.") + @Default.Integer(60) + int getKafkaTopicCreateTimeMaxDelaySec(); + + void setKafkaTopicCreateTimeMaxDelaySec(int value); + @Description("Base name of Kafka results topic in streaming mode.") @Default.String("nexmark-results") @Nullable