diff --git a/contrib/examples/kafka/pom.xml b/contrib/examples/kafka/pom.xml new file mode 100644 index 0000000000..3355020b62 --- /dev/null +++ b/contrib/examples/kafka/pom.xml @@ -0,0 +1,189 @@ + + + + 4.0.0 + + com.google.cloud.dataflow + google-cloud-dataflow-java-contrib-kafka-examples + Google Cloud Dataflow Kafka Examples + Examples apps using Kafka Source in Google Cloud Dataflow + 0.0.1-SNAPSHOT + jar + + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + UTF-8 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.2 + + 1.7 + 1.7 + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.12 + + + com.puppycrawl.tools + checkstyle + 6.6 + + + + ../../../checkstyle.xml + true + true + true + + + + + check + + + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.4 + + + attach-sources + compile + + jar + + + + attach-test-sources + test-compile + + test-jar + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.10.3 + + Google Cloud Dataflow Kafka Contrib + Google Cloud Dataflow Kafka Contrib + + com.google.cloud.dataflow.contrib.kafka + false + ]]> + + + + https://cloud.google.com/dataflow/java-sdk/JavaDoc/ + ${basedir}/../../javadoc/dataflow-sdk-docs + + + http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/ + ${basedir}/../../javadoc/guava-docs + + + + + + + jar + + package + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.3 + + + package + + shade + + + ${project.artifactId}-bundled-${project.version} + + + *:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + + + + + com.google.cloud.dataflow + google-cloud-dataflow-java-contrib-kafka + ${project.version} + + + org.slf4j + slf4j-api + 1.7.7 + + + org.slf4j + slf4j-jdk14 + 1.7.7 + runtime + + + diff --git a/contrib/examples/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/examples/TopHashtagsExample.java b/contrib/examples/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/examples/TopHashtagsExample.java new file mode 100644 index 0000000000..2a575f1d8b --- /dev/null +++ b/contrib/examples/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/examples/TopHashtagsExample.java @@ -0,0 +1,265 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.kafka.examples; + +import com.google.cloud.dataflow.contrib.kafka.KafkaIO; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.Validation.Required; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; +import com.google.cloud.dataflow.sdk.transforms.Top; +import com.google.cloud.dataflow.sdk.transforms.Values; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * This Dataflow app show cases {@link KafkaIO}. The application reads from a Kafka topic + * containing JSON Tweets, calculates top + * hashtags in 10 minute window. The results are written back to a Kafka topic. + * + *
{@code
+ * Usage:
+ *   $ java -cp jar_with_dependencies.jar                                           \
+ *          com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample     \
+ *          --project=GCP_PROJECT                                                   \
+ *          --stagingLocation=GS_STAGING_DIRECTORY                                  \
+ *          --runner=BlockingDataflowPipelineRunner                                 \
+ *          --bootstrapServers="kafka_server_1:9092"                                \
+ *          --topics="sample_tweets_json"                                           \
+ *          --outputTopic="top_hashtags"
+ * }
+ */ +public class TopHashtagsExample { + + private static final Logger LOG = LoggerFactory.getLogger(TopHashtagsExample.class); + + /** + * Options for the app. + */ + public static interface Options extends PipelineOptions { + @Description("Sliding window length in minutes") + @Default.Integer(10) + Integer getSlidingWindowLengthMinutes(); + void setSlidingWindowLengthMinutes(Integer value); + + @Description("Trigger window interval in minutes") + @Default.Integer(1) + Integer getSlidingWindowIntervalMinutes(); + void setSlidingWindowIntervalMinutes(Integer value); + + @Description("Bootstrap Server(s) for Kafka") + @Required + String getBootstrapServers(); + void setBootstrapServers(String servers); + + @Description("One or more comma separated topics to read from") + @Required + List getTopics(); + void setTopics(List topics); + + @Description("Number of Top Hashtags to track") + @Default.Integer(10) + Integer getNumTopHashtags(); + void setNumTopHashtags(Integer count); + + @Description("Kafka topic name for writing results") + @Required + String getOutputTopic(); + void setOutputTopic(String topic); + } + + public static void main(String args[]) { + + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + + Pipeline pipeline = Pipeline.create(options); + + pipeline + .apply(KafkaIO.read() + .withBootstrapServers(options.getBootstrapServers()) + .withTopics(options.getTopics()) + .withValueCoder(StringUtf8Coder.of()) + .withTimestampFn(TWEET_TIMESTAMP_OR_NOW) + .withoutMetadata()) + .apply(Values.create()) + .apply(ParDo.of(new ExtractHashtagsFn())) + .apply(Window.into(SlidingWindows + .of(Duration.standardMinutes(options.getSlidingWindowLengthMinutes())) + .every(Duration.standardMinutes(options.getSlidingWindowIntervalMinutes())))) + .apply(Count.perElement()) + .apply(Top.of(options.getNumTopHashtags(), new KV.OrderByValue()) + .withoutDefaults()) + .apply(ParDo.of(new OutputFormatter())) + .apply(ParDo.of(new KafkaWriter(options))); + + pipeline.run(); + } + + // The rest of the file implements DoFns to do the following: + // - extract hashtags + // - format results in json + // - write the results back to Kafka (useful for fetching monitoring the end result). + + private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); + + /** + * Emit hashtags in the tweet (if any). + */ + private static class ExtractHashtagsFn extends DoFn { + + @Override + public void processElement(ProcessContext ctx) throws Exception { + for (JsonNode hashtag : JSON_MAPPER.readTree(ctx.element()) + .with("entities") + .withArray("hashtags")) { + ctx.output(hashtag.get("text").asText()); + } + } + } + + // extract timestamp from "timestamp_ms" field. + private static final SerializableFunction, Instant> TWEET_TIMESTAMP_OR_NOW = + new SerializableFunction, Instant>() { + @Override + public Instant apply(KV kv) { + try { + long tsMillis = JSON_MAPPER.readTree(kv.getValue()).path("timestamp_ms").asLong(); + return tsMillis == 0 ? Instant.now() : new Instant(tsMillis); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + }; + + // return json string containing top hashtags and window information time + private static class OutputFormatter extends DoFn>, String> + implements DoFn.RequiresWindowAccess { + + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat + .forPattern("yyyy-MM-dd HH:mm:ss") + .withZoneUTC(); + private static final ObjectWriter JSON_WRITER = new ObjectMapper() + .writerWithType(OutputJson.class); + + static class OutputJson { + @JsonProperty String windowStart; + @JsonProperty String windowEnd; + @JsonProperty String generatedAt; + @JsonProperty List topHashtags; + + OutputJson(String windowStart, String windowEnd, + String generatedAt, List topHashtags) { + this.windowStart = windowStart; + this.windowEnd = windowEnd; + this.generatedAt = generatedAt; + this.topHashtags = topHashtags; + } + } + + static class HashtagInfo { + @JsonProperty final String hashtag; + @JsonProperty final long count; + HashtagInfo(String hashtag, long count) { + this.hashtag = hashtag; + this.count = count; + } + } + + @Override + public void processElement(ProcessContext ctx) throws Exception { + + List topHashtags = new ArrayList<>(ctx.element().size()); + + for (KV tag : ctx.element()) { + topHashtags.add(new HashtagInfo(tag.getKey(), tag.getValue())); + } + + IntervalWindow window = (IntervalWindow) ctx.window(); + + String json = JSON_WRITER.writeValueAsString(new OutputJson( + DATE_FORMATTER.print(window.start()), + DATE_FORMATTER.print(window.end()), + DATE_FORMATTER.print(Instant.now()), + topHashtags)); + + ctx.output(json); + } + } + + private static class KafkaWriter extends DoFn { + + private final String topic; + private final Map config; + private static transient KafkaProducer producer = null; + + public KafkaWriter(Options options) { + this.topic = options.getOutputTopic(); + this.config = ImmutableMap.of( + "bootstrap.servers", options.getBootstrapServers(), + "key.serializer", StringSerializer.class.getName(), + "value.serializer", StringSerializer.class.getName()); + } + + @Override + public void startBundle(Context c) throws Exception { + if (producer == null) { // in Beam, startBundle might be called multiple times. + producer = new KafkaProducer(config); + } + } + + @Override + public void finishBundle(Context c) throws Exception { + producer.flush(); + } + + @Override + public void processElement(ProcessContext ctx) throws Exception { + LOG.trace("Top Hashtags : {}", ctx.element()); + producer.send(new ProducerRecord(topic, ctx.element())); + } + } +} diff --git a/contrib/kafka/pom.xml b/contrib/kafka/pom.xml new file mode 100644 index 0000000000..7fe8165bc8 --- /dev/null +++ b/contrib/kafka/pom.xml @@ -0,0 +1,176 @@ + + + + 4.0.0 + + com.google.cloud.dataflow + google-cloud-dataflow-java-contrib-kafka + Google Cloud Dataflow Kafka Connectors + Dataflow Library to read Kafka topics + 0.0.1-SNAPSHOT + jar + + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + UTF-8 + [1.2.0,2.0.0) + 1.3 + 4.11 + 1.7.7 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.2 + + 1.7 + 1.7 + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.12 + + + com.puppycrawl.tools + checkstyle + 6.6 + + + + ../../checkstyle.xml + true + true + true + + + + + check + + + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.4 + + + attach-sources + compile + + jar + + + + attach-test-sources + test-compile + + test-jar + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.10.3 + + Google Cloud Dataflow Kafka Contrib + Google Cloud Dataflow Kafka Contrib + + com.google.cloud.dataflow.contrib.kafka + false + ]]> + + + + https://cloud.google.com/dataflow/java-sdk/JavaDoc/ + ${basedir}/../../javadoc/dataflow-sdk-docs + + + http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/ + ${basedir}/../../javadoc/guava-docs + + + + + + + jar + + package + + + + + + + + + com.google.cloud.dataflow + google-cloud-dataflow-java-sdk-all + ${google-cloud-dataflow-version} + + + + org.apache.kafka + kafka-clients + [0.9,) + + + + + org.hamcrest + hamcrest-all + ${hamcrest.version} + test + + + + junit + junit + ${junit.version} + test + + + + org.slf4j + slf4j-jdk14 + ${slf4j.version} + test + + + diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaCheckpointMark.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaCheckpointMark.java new file mode 100644 index 0000000000..9b33ee809c --- /dev/null +++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaCheckpointMark.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.kafka; + +import com.google.cloud.dataflow.sdk.coders.DefaultCoder; +import com.google.cloud.dataflow.sdk.coders.SerializableCoder; +import com.google.cloud.dataflow.sdk.io.UnboundedSource; + +import org.apache.kafka.common.TopicPartition; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +/** + * Checkpoint for an unbounded KafkaIO.Read. Consists of Kafka topic name, partition id, + * and the latest offset consumed so far. + */ +@DefaultCoder(SerializableCoder.class) +public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark, Serializable { + + private final List partitions; + + public KafkaCheckpointMark(List partitions) { + this.partitions = partitions; + } + + public List getPartitions() { + return partitions; + } + + @Override + public void finalizeCheckpoint() throws IOException { + /* nothing to do */ + + // We might want to support committing offset in Kafka for better resume point when the job + // is restarted (checkpoint is not available for job restarts). + } + + /** + * A tuple to hold topic, partition, and offset that comprise the checkpoint + * for a single partition. + */ + public static class PartitionMark implements Serializable { + private final TopicPartition topicPartition; + private final long offset; + + public PartitionMark(TopicPartition topicPartition, long offset) { + this.topicPartition = topicPartition; + this.offset = offset; + } + + public TopicPartition getTopicPartition() { + return topicPartition; + } + + public long getOffset() { + return offset; + } + } +} + diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java new file mode 100644 index 0000000000..ad254ee735 --- /dev/null +++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java @@ -0,0 +1,1053 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.kafka; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.cloud.dataflow.contrib.kafka.KafkaCheckpointMark.PartitionMark; +import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.SerializableCoder; +import com.google.cloud.dataflow.sdk.io.Read.Unbounded; +import com.google.cloud.dataflow.sdk.io.UnboundedSource; +import com.google.cloud.dataflow.sdk.io.UnboundedSource.CheckpointMark; +import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; +import com.google.cloud.dataflow.sdk.util.ExposedByteArrayInputStream; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PBegin; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Throwables; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.io.Closeables; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nullable; + +/** + * An unbounded source for Kafka topics. Kafka version 0.9 + * and above are supported. + * + *

Reading from Kafka topics

+ * + *

KafkaIO source returns unbounded collection of Kafka records as + * {@code PCollection>}. A {@link KafkaRecord} includes basic + * metadata like topic-partition and offset, along with key and value associated with a Kafka + * record. + * + *

Although most applications consumer single topic, the source can be configured to consume + * multiple topics or even a specific set of {@link TopicPartition}s. + * + *

To configure a Kafka source, you must specify at the minimum Kafka bootstrapServers + * and one or more topics to consume. The following example illustrates various options for + * configuring the source : + * + *

{@code
+ *
+ *  pipeline
+ *    .apply(KafkaIO.read()
+ *       .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *       .withTopics(ImmutableList.of("topic_a", "topic_b"))
+ *       // above two are required configuration. returns PCollection
+ *
+ *       // rest of the settings are optional :
+ *
+ *       // set a Coder for Key and Value (note the change to return type)
+ *       .withKeyCoder(BigEndianLongCoder.of()) // PCollection
+ *       .withValueCoder(StringUtf8Coder.of())  // PCollection
+ *
+ *       // you can further customize KafkaConsumer used to read the records by adding more
+ *       // settings for ConsumerConfig. e.g :
+ *       .updateConsumerProperties(ImmutableMap.of("receive.buffer.bytes", 1024 * 1024))
+ *
+ *       // custom function for calculating record timestamp (default is processing time)
+ *       .withTimestampFn(new MyTypestampFunction())
+ *
+ *       // custom function for watermark (default is record timestamp)
+ *       .withWatermarkFn(new MyWatermarkFunction())
+ *
+ *       // finally, if you don't need Kafka metadata, you can drop it
+ *       .withoutMetadata() // PCollection>
+ *    )
+ *    .apply(Values.create()) // PCollection
+ *     ...
+ * }
+ * + *

Partition Assignment and Checkpointing

+ * The Kafka partitions are evenly distributed among splits (workers). + * Dataflow checkpointing is fully supported and + * each split can resume from previous checkpoint. See + * {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for more details on + * splits and checkpoint support. + * + *

When the pipeline starts for the first time without any checkpoint, the source starts + * consuming from the latest offsets. You can override this behavior to consume from the + * beginning by setting appropriate appropriate properties in {@link ConsumerConfig}, through + * {@link Read#updateConsumerProperties(Map)}. + * + *

Advanced Kafka Configuration

+ * KafakIO allows setting most of the properties in {@link ConsumerConfig}. E.g. if you would like + * to enable offset auto commit (for external monitoring or other purposes), you can set + * "group.id", "enable.auto.commit", etc. + */ +public class KafkaIO { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class); + + private static class NowTimestampFn implements SerializableFunction { + @Override + public Instant apply(T input) { + return Instant.now(); + } + } + + + /** + * Creates and uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka + * configuration should set with {@link Read#withBootstrapServers(String)} and + * {@link Read#withTopics(List)}. Other optional settings include key and value coders, + * custom timestamp and watermark functions. Additionally, {@link Read#withMetadata()} provides + * access to Kafka metadata for each record (topic name, partition, offset). + */ + public static Read read() { + return new Read( + new ArrayList(), + new ArrayList(), + ByteArrayCoder.of(), + ByteArrayCoder.of(), + Read.KAFKA_9_CONSUMER_FACTORY_FN, + Read.DEFAULT_CONSUMER_PROPERTIES, + Long.MAX_VALUE, + null); + } + + /** + * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more + * information on usage and configuration. + */ + public static class Read extends TypedRead { + + /** + * Returns a new {@link Read} with Kafka consumer pointing to {@code bootstrapServers}. + */ + public Read withBootstrapServers(String bootstrapServers) { + return updateConsumerProperties( + ImmutableMap.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); + } + + /** + * Returns a new {@link Read} that reads from the topics. All the partitions are from each + * of the topics is read. + * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description + * of how the partitions are distributed among the splits. + */ + public Read withTopics(List topics) { + checkState(topicPartitions.isEmpty(), "Only topics or topicPartitions can be set, not both"); + + return new Read(ImmutableList.copyOf(topics), topicPartitions, keyCoder, valueCoder, + consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime); + } + + /** + * Returns a new {@link Read} that reads from the partitions. This allows reading only a subset + * of partitions for one or more topics when (if ever) needed. + * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description + * of how the partitions are distributed among the splits. + */ + public Read withTopicPartitions(List topicPartitions) { + checkState(topics.isEmpty(), "Only topics or topicPartitions can be set, not both"); + + return new Read(topics, ImmutableList.copyOf(topicPartitions), keyCoder, valueCoder, + consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime); + } + + /** + * Returns a new {@link Read} with {@link Coder} for key bytes. + */ + public Read withKeyCoder(Coder keyCoder) { + return new Read(topics, topicPartitions, keyCoder, valueCoder, + consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime); + } + + /** + * Returns a new {@link Read} with {@link Coder} for value bytes. + */ + public Read withValueCoder(Coder valueCoder) { + return new Read(topics, topicPartitions, keyCoder, valueCoder, + consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime); + } + + /** + * A factory to create Kafka {@link Consumer} from consumer configuration. + * This is useful for supporting another version of Kafka consumer. + * Default is {@link KafkaConsumer}. + */ + public Read withConsumerFactoryFn( + SerializableFunction, Consumer> consumerFactoryFn) { + return new Read(topics, topicPartitions, keyCoder, valueCoder, + consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime); + } + + /** + * Update consumer configuration with new properties. + */ + public Read updateConsumerProperties(Map configUpdates) { + for (String key : configUpdates.keySet()) { + checkArgument(!IGNORED_CONSUMER_PROPERTIES.containsKey(key), + "No need to configure '%s'. %s", key, IGNORED_CONSUMER_PROPERTIES.get(key)); + } + + Map config = new HashMap<>(consumerConfig); + config.putAll(configUpdates); + + return new Read(topics, topicPartitions, keyCoder, valueCoder, + consumerFactoryFn, config, maxNumRecords, maxReadTime); + } + + /** + * Similar to {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded#withMaxNumRecords(long)}. + * Mainly used for tests and demo applications. + */ + public Read withMaxNumRecords(long maxNumRecords) { + return new Read(topics, topicPartitions, keyCoder, valueCoder, + consumerFactoryFn, consumerConfig, maxNumRecords, null); + } + + /** + * Similar to + * {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded#withMaxReadTime(Duration)}. + * Mainly used for tests and demo + * applications. + */ + public Read withMaxReadTime(Duration maxReadTime) { + return new Read(topics, topicPartitions, keyCoder, valueCoder, + consumerFactoryFn, consumerConfig, Long.MAX_VALUE, maxReadTime); + } + + /////////////////////////////////////////////////////////////////////////////////////// + + private Read( + List topics, + List topicPartitions, + Coder keyCoder, + Coder valueCoder, + SerializableFunction, Consumer> consumerFactoryFn, + Map consumerConfig, + long maxNumRecords, + @Nullable Duration maxReadTime) { + + super(topics, topicPartitions, keyCoder, valueCoder, null, null, + consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime); + } + + /** + * A set of properties that are not required or don't make sense for our consumer. + */ + private static final Map IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDecoderFn instead", + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDecoderFn instead" + // "group.id", "enable.auto.commit", "auto.commit.interval.ms" : + // lets allow these, applications can have better resume point for restarts. + ); + + // set config defaults + private static final Map DEFAULT_CONSUMER_PROPERTIES = + ImmutableMap.of( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + + // Use large receive buffer. Once KAFKA-3135 is fixed, this _may_ not be required. + // with default value of of 32K, It takes multiple seconds between successful polls. + // All the consumer work is done inside poll(), with smaller send buffer size, it + // takes many polls before a 1MB chunk from the server is fully read. In my testing + // about half of the time select() inside kafka consumer waited for 20-30ms, though + // the server had lots of data in tcp send buffers on its side. Compared to default, + // this setting increased throughput increased by many fold (3-4x). + ConsumerConfig.RECEIVE_BUFFER_CONFIG, 512 * 1024, + + // default to latest offset when we are not resuming. + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest", + // disable auto commit of offsets. we don't require group_id. could be enabled by user. + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + // default Kafka 0.9 Consumer supplier. + private static final SerializableFunction, Consumer> + KAFKA_9_CONSUMER_FACTORY_FN = + new SerializableFunction, Consumer>() { + public Consumer apply(Map config) { + return new KafkaConsumer<>(config); + } + }; + } + + /** + * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more + * information on usage and configuration. + */ + public static class TypedRead + extends PTransform>> { + + /** + * A function to assign a timestamp to a record. Default is processing timestamp. + */ + public TypedRead withTimestampFn2( + SerializableFunction, Instant> timestampFn) { + checkNotNull(timestampFn); + return new TypedRead(topics, topicPartitions, keyCoder, valueCoder, + timestampFn, watermarkFn, consumerFactoryFn, consumerConfig, + maxNumRecords, maxReadTime); + } + + /** + * A function to calculate watermark after a record. Default is last record timestamp + * @see #withTimestampFn(SerializableFunction) + */ + public TypedRead withWatermarkFn2( + SerializableFunction, Instant> watermarkFn) { + checkNotNull(watermarkFn); + return new TypedRead(topics, topicPartitions, keyCoder, valueCoder, + timestampFn, watermarkFn, consumerFactoryFn, consumerConfig, + maxNumRecords, maxReadTime); + } + + /** + * A function to assign a timestamp to a record. Default is processing timestamp. + */ + public TypedRead withTimestampFn(SerializableFunction, Instant> timestampFn) { + checkNotNull(timestampFn); + return withTimestampFn2(unwrapKafkaAndThen(timestampFn)); + } + + /** + * A function to calculate watermark after a record. Default is last record timestamp + * @see #withTimestampFn(SerializableFunction) + */ + public TypedRead withWatermarkFn(SerializableFunction, Instant> watermarkFn) { + checkNotNull(watermarkFn); + return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn)); + } + + /** + * Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. + */ + public PTransform>> withoutMetadata() { + return new TypedWithoutMetadata(this); + } + + @Override + public PCollection> apply(PBegin input) { + // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. + Unbounded> unbounded = + com.google.cloud.dataflow.sdk.io.Read.from(makeSource()); + + PTransform>> transform = unbounded; + + if (maxNumRecords < Long.MAX_VALUE) { + transform = unbounded.withMaxNumRecords(maxNumRecords); + } else if (maxReadTime != null) { + transform = unbounded.withMaxReadTime(maxReadTime); + } + + return input.getPipeline().apply(transform); + } + + //////////////////////////////////////////////////////////////////////////////////////// + + protected final List topics; + protected final List topicPartitions; // mutually exclusive with topics + protected final Coder keyCoder; + protected final Coder valueCoder; + @Nullable protected final SerializableFunction, Instant> timestampFn; + @Nullable protected final SerializableFunction, Instant> watermarkFn; + protected final + SerializableFunction, Consumer> consumerFactoryFn; + protected final Map consumerConfig; + protected final long maxNumRecords; // bounded read, mainly for testing + protected final Duration maxReadTime; // bounded read, mainly for testing + + private TypedRead(List topics, + List topicPartitions, + Coder keyCoder, + Coder valueCoder, + @Nullable SerializableFunction, Instant> timestampFn, + @Nullable SerializableFunction, Instant> watermarkFn, + SerializableFunction, Consumer> consumerFactoryFn, + Map consumerConfig, + long maxNumRecords, + @Nullable Duration maxReadTime) { + super("KafkaIO.Read"); + + this.topics = topics; + this.topicPartitions = topicPartitions; + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + this.timestampFn = timestampFn; + this.watermarkFn = watermarkFn; + this.consumerFactoryFn = consumerFactoryFn; + this.consumerConfig = consumerConfig; + this.maxNumRecords = maxNumRecords; + this.maxReadTime = maxReadTime; + } + + /** + * Creates an {@link UnboundedSource, ?>} with the configuration in + * {@link TypedRead}. Primary use case is unit tests, should not be used in an + * application. + */ + @VisibleForTesting + UnboundedSource, KafkaCheckpointMark> makeSource() { + return new UnboundedKafkaSource( + -1, + topics, + topicPartitions, + keyCoder, + valueCoder, + timestampFn, + Optional.fromNullable(watermarkFn), + consumerFactoryFn, + consumerConfig); + } + + // utility method to convert KafkRecord to user KV before applying user functions + private static SerializableFunction, OutT> + unwrapKafkaAndThen(final SerializableFunction, OutT> fn) { + return new SerializableFunction, OutT>() { + public OutT apply(KafkaRecord record) { + return fn.apply(record.getKV()); + } + }; + } + } + + /** + * A {@link PTransform} to read from Kafka topics. Similar to {@link KafkaIO.Typed}, but removes + * Kafka metatdata and returns a {@link PCollection} of {@link KV}. + * See {@link KafkaIO} for more information on usage and configuration of reader. + */ + public static class TypedWithoutMetadata extends PTransform>> { + + private final TypedRead typedRead; + + TypedWithoutMetadata(TypedRead read) { + super("KafkaIO.Read"); + this.typedRead = read; + } + + @Override + public PCollection> apply(PBegin begin) { + return typedRead + .apply(begin) + .apply("Remove Kafka Metadata", + ParDo.of(new DoFn, KV>() { + @Override + public void processElement(ProcessContext ctx) { + ctx.output(ctx.element().getKV()); + } + })); + } + } + + /** Static class, prevent instantiation. */ + private KafkaIO() {} + + private static class UnboundedKafkaSource + extends UnboundedSource, KafkaCheckpointMark> { + + private final int id; // split id, mainly for debugging + private final List topics; + private final List assignedPartitions; + private final Coder keyCoder; + private final Coder valueCoder; + private final SerializableFunction, Instant> timestampFn; + // would it be a good idea to pass currentTimestamp to watermarkFn? + private final Optional, Instant>> watermarkFn; + private + SerializableFunction, Consumer> consumerFactoryFn; + private final Map consumerConfig; + + public UnboundedKafkaSource( + int id, + List topics, + List assignedPartitions, + Coder keyCoder, + Coder valueCoder, + @Nullable SerializableFunction, Instant> timestampFn, + Optional, Instant>> watermarkFn, + SerializableFunction, Consumer> consumerFactoryFn, + Map consumerConfig) { + + this.id = id; + this.assignedPartitions = assignedPartitions; + this.topics = topics; + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + this.timestampFn = + (timestampFn == null ? new NowTimestampFn>() : timestampFn); + this.watermarkFn = watermarkFn; + this.consumerFactoryFn = consumerFactoryFn; + this.consumerConfig = consumerConfig; + } + + /** + * The partitions are evenly distributed among the splits. The number of splits returned is + * {@code min(desiredNumSplits, totalNumPartitions)}, though better not to depend on the exact + * count. + * + *

It is important to assign the partitions deterministically so that we can support + * resuming a split from last checkpoint. The Kafka partitions are sorted by + * {@code } and then assigned to splits in round-robin order. + */ + @Override + public List> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + + List partitions = new ArrayList<>(assignedPartitions); + + // (a) fetch partitions for each topic + // (b) sort by + // (c) round-robin assign the partitions to splits + + if (partitions.isEmpty()) { + try (Consumer consumer = consumerFactoryFn.apply(consumerConfig)) { + for (String topic : topics) { + for (PartitionInfo p : consumer.partitionsFor(topic)) { + partitions.add(new TopicPartition(p.topic(), p.partition())); + } + } + } + } + + Collections.sort(partitions, new Comparator() { + public int compare(TopicPartition tp1, TopicPartition tp2) { + return ComparisonChain + .start() + .compare(tp1.topic(), tp2.topic()) + .compare(tp1.partition(), tp2.partition()) + .result(); + } + }); + + checkArgument(desiredNumSplits > 0); + checkState(partitions.size() > 0, + "Could not find any partitions. Please check Kafka configuration and topic names"); + + int numSplits = Math.min(desiredNumSplits, partitions.size()); + List> assignments = new ArrayList<>(numSplits); + + for (int i = 0; i < numSplits; i++) { + assignments.add(new ArrayList()); + } + for (int i = 0; i < partitions.size(); i++) { + assignments.get(i % numSplits).add(partitions.get(i)); + } + + List> result = new ArrayList<>(numSplits); + + for (int i = 0; i < numSplits; i++) { + List assignedToSplit = assignments.get(i); + + LOG.info("Partitions assigned to split {} (total {}): {}", + i, assignedToSplit.size(), Joiner.on(",").join(assignedToSplit)); + + result.add(new UnboundedKafkaSource( + i, + this.topics, + assignedToSplit, + this.keyCoder, + this.valueCoder, + this.timestampFn, + this.watermarkFn, + this.consumerFactoryFn, + this.consumerConfig)); + } + + return result; + } + + @Override + public UnboundedKafkaReader createReader(PipelineOptions options, + KafkaCheckpointMark checkpointMark) { + if (assignedPartitions.isEmpty()) { + LOG.warn("Looks like generateSplits() is not called. Generate single split."); + try { + return new UnboundedKafkaReader( + generateInitialSplits(1, options).get(0), checkpointMark); + } catch (Exception e) { + Throwables.propagate(e); + } + } + return new UnboundedKafkaReader(this, checkpointMark); + } + + @Override + public Coder getCheckpointMarkCoder() { + return SerializableCoder.of(KafkaCheckpointMark.class); + } + + @Override + public boolean requiresDeduping() { + // Kafka records are ordered with in partitions. In addition checkpoint guarantees + // records are not consumed twice. + return false; + } + + @Override + public void validate() { + checkNotNull(consumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), + "Kafka bootstrap servers should be set"); + checkArgument(topics.size() > 0 || assignedPartitions.size() > 0, + "Kafka topics or topic_partitions are required"); + } + + @Override + public Coder> getDefaultOutputCoder() { + return KafkaRecordCoder.of(keyCoder, valueCoder); + } + } + + private static class UnboundedKafkaReader extends UnboundedReader> { + + private final UnboundedKafkaSource source; + private final String name; + private Consumer consumer; + private final List partitionStates; + private KafkaRecord curRecord; + private Instant curTimestamp; + private Iterator curBatch = Collections.emptyIterator(); + + private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000); + // how long to wait for new records from kafka consumer inside advance() + private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10); + + // Use a separate thread to read Kafka messages. Kafka Consumer does all its work including + // network I/O inside poll(). Polling only inside #advance(), especially with a small timeout + // like 100 milliseconds does not work well. This along with large receive buffer for + // consumer achieved best throughput in tests (see `defaultConsumerProperties`). + private final ExecutorService consumerPollThread = Executors.newSingleThreadExecutor(); + private final SynchronousQueue> availableRecordsQueue = + new SynchronousQueue<>(); + private volatile boolean closed = false; + + // Backlog support : + // Kafka consumer does not have an API to fetch latest offset for topic. We need to seekToEnd() + // then look at position(). Use another consumer to do this so that the primary consumer does + // not need to be interrupted. The latest offsets are fetched periodically on another thread. + // This is still a hack. There could be unintended side effects, e.g. if user enabled offset + // auto commit in consumer config, this could interfere with the primary consumer (we will + // handle this particular problem). We might have to make this optional. + private Consumer offsetConsumer; + private final ScheduledExecutorService offsetFetcherThread = + Executors.newSingleThreadScheduledExecutor(); + private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 5; + + /** watermark before any records have been read. */ + private static Instant initialWatermark = new Instant(Long.MIN_VALUE); + + public String toString() { + return name; + } + + // maintains state of each assigned partition (buffered records, consumed offset, etc) + private static class PartitionState { + private final TopicPartition topicPartition; + private long consumedOffset; + private long latestOffset; + private Iterator> recordIter = Collections.emptyIterator(); + + // simple moving average for size of each record in bytes + private double avgRecordSize = 0; + private static final int movingAvgWindow = 1000; // very roughly avg of last 1000 elements + + + PartitionState(TopicPartition partition, long offset) { + this.topicPartition = partition; + this.consumedOffset = offset; + this.latestOffset = -1; + } + + // update consumedOffset and avgRecordSize + void recordConsumed(long offset, int size) { + consumedOffset = offset; + + // this is always updated from single thread. probably not worth making it an AtomicDouble + if (avgRecordSize <= 0) { + avgRecordSize = size; + } else { + // initially, first record heavily contributes to average. + avgRecordSize += ((size - avgRecordSize) / movingAvgWindow); + } + } + + synchronized void setLatestOffset(long latestOffset) { + this.latestOffset = latestOffset; + } + + synchronized long approxBacklogInBytes() { + // Note that is an an estimate of uncompressed backlog. + // Messages on Kafka might be comressed. + if (latestOffset < 0 || consumedOffset < 0) { + return UnboundedReader.BACKLOG_UNKNOWN; + } + if (latestOffset <= consumedOffset || consumedOffset < 0) { + return 0; + } + return (long) ((latestOffset - consumedOffset - 1) * avgRecordSize); + } + } + + public UnboundedKafkaReader( + UnboundedKafkaSource source, + @Nullable KafkaCheckpointMark checkpointMark) { + + this.source = source; + this.name = "Reader-" + source.id; + + partitionStates = ImmutableList.copyOf(Lists.transform(source.assignedPartitions, + new Function() { + public PartitionState apply(TopicPartition tp) { + return new PartitionState(tp, -1L); + } + })); + + if (checkpointMark != null) { + // a) verify that assigned and check-pointed partitions match exactly + // b) set consumed offsets + + checkState(checkpointMark.getPartitions().size() == source.assignedPartitions.size(), + "checkPointMark and assignedPartitions should match"); + // we could consider allowing a mismatch, though it is not expected in current Dataflow + + for (int i = 0; i < source.assignedPartitions.size(); i++) { + PartitionMark ckptMark = checkpointMark.getPartitions().get(i); + TopicPartition assigned = source.assignedPartitions.get(i); + + checkState(ckptMark.getTopicPartition().equals(assigned), + "checkpointed partition %s and assigned partition %s don't match", + ckptMark.getTopicPartition(), assigned); + + partitionStates.get(i).consumedOffset = ckptMark.getOffset(); + } + } + } + + private void consumerPollLoop() { + // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue + while (!closed) { + try { + ConsumerRecords records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis()); + if (!records.isEmpty()) { + availableRecordsQueue.put(records); // blocks until dequeued. + } + } catch (InterruptedException e) { + LOG.warn("{}: consumer thread is interrupted", this, e); // not expected + break; + } catch (WakeupException e) { + break; + } + } + + LOG.info("{}: Returning from consumer pool loop", this); + } + + private void nextBatch() { + curBatch = Collections.emptyIterator(); + + ConsumerRecords records; + try { + records = availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(), + TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.warn("{}: Unexpected", this, e); + return; + } + + if (records == null) { + return; + } + + List nonEmpty = new LinkedList<>(); + + for (PartitionState p : partitionStates) { + p.recordIter = records.records(p.topicPartition).iterator(); + if (p.recordIter.hasNext()) { + nonEmpty.add(p); + } + } + + // cycle through the partitions in order to interleave records from each. + curBatch = Iterators.cycle(nonEmpty); + } + + @Override + public boolean start() throws IOException { + consumer = source.consumerFactoryFn.apply(source.consumerConfig); + consumer.assign(source.assignedPartitions); + + // seek to consumedOffset + 1 if it is set + for (PartitionState p : partitionStates) { + if (p.consumedOffset >= 0) { + LOG.info("{}: resuming {} at {}", name, p.topicPartition, p.consumedOffset + 1); + consumer.seek(p.topicPartition, p.consumedOffset + 1); + } else { + LOG.info("{}: resuming {} at default offset", name, p.topicPartition); + } + } + + // start consumer read loop. + // Note that consumer is not thread safe, should not accessed out side consumerPollLoop() + consumerPollThread.submit( + new Runnable() { + public void run() { + consumerPollLoop(); + } + }); + + // offsetConsumer setup : + + // override client_id and auto_commit so that it does not interfere with main consumer. + String offsetConsumerId = String.format("%s_offset_consumer_%d_%s", name, + (new Random()).nextInt(Integer.MAX_VALUE), + source.consumerConfig.getOrDefault(ConsumerConfig.CLIENT_ID_CONFIG, "none")); + Map offsetConsumerConfig = new HashMap<>(source.consumerConfig); + offsetConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, offsetConsumerId); + offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + offsetConsumer = source.consumerFactoryFn.apply(offsetConsumerConfig); + offsetConsumer.assign(source.assignedPartitions); + + offsetFetcherThread.scheduleAtFixedRate( + new Runnable() { + public void run() { + updateLatestOffsets(); + } + }, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS); + + return advance(); + } + + @Override + public boolean advance() throws IOException { + /* Read first record (if any). we need to loop here because : + * - (a) some records initially need to be skipped if they are before consumedOffset + * - (b) if curBatch is empty, we want to fetch next batch and then advance. + * - (c) curBatch is an iterator of iterators. we interleave the records from each. + * curBatch.next() might return an empty iterator. + */ + while (true) { + if (curBatch.hasNext()) { + PartitionState pState = curBatch.next(); + + if (!pState.recordIter.hasNext()) { // -- (c) + pState.recordIter = Collections.emptyIterator(); // drop ref + curBatch.remove(); + continue; + } + + ConsumerRecord rawRecord = pState.recordIter.next(); + long consumed = pState.consumedOffset; + long offset = rawRecord.offset(); + + if (consumed >= 0 && offset <= consumed) { // -- (a) + // this can happen when compression is enabled in Kafka (seems to be fixed in 0.10) + // should we check if the offset is way off from consumedOffset (say > 1M)? + LOG.warn("{}: ignoring already consumed offset {} for {}", + this, offset, pState.topicPartition); + continue; + } + + // sanity check + if (consumed >= 0 && (offset - consumed) != 1) { + LOG.warn("{}: gap in offsets for {} after {}. {} records missing.", + this, pState.topicPartition, consumed, offset - consumed - 1); + } + + if (curRecord == null) { + LOG.info("{}: first record offset {}", name, offset); + } + + curRecord = null; // user coders below might throw. + + // apply user coders. might want to allow skipping records that fail to decode. + // TODO: wrap exceptions from coders to make explicit to users + KafkaRecord record = new KafkaRecord( + rawRecord.topic(), + rawRecord.partition(), + rawRecord.offset(), + decode(rawRecord.key(), source.keyCoder), + decode(rawRecord.value(), source.valueCoder)); + + curTimestamp = source.timestampFn.apply(record); + curRecord = record; + + int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) + + (rawRecord.value() == null ? 0 : rawRecord.value().length); + pState.recordConsumed(offset, recordSize); + return true; + + } else { // -- (b) + nextBatch(); + + if (!curBatch.hasNext()) { + return false; + } + } + } + } + + private static byte[] nullBytes = new byte[0]; + private static T decode(byte[] bytes, Coder coder) throws IOException { + // If 'bytes' is null, use byte[0]. It is common for key in Kakfa record to be null. + // This makes it impossible for user to distinguish between zero length byte and null. + // Alternately, we could have a ByteArrayCoder that handles nulls, and use that for default + // coder. + byte[] toDecode = bytes == null ? nullBytes : bytes; + return coder.decode(new ExposedByteArrayInputStream(toDecode), Coder.Context.OUTER); + } + + // update latest offset for each partition. + // called from offsetFetcher thread + private void updateLatestOffsets() { + for (PartitionState p : partitionStates) { + try { + offsetConsumer.seekToEnd(p.topicPartition); + long offset = offsetConsumer.position(p.topicPartition); + p.setLatestOffset(offset);; + } catch (Exception e) { + LOG.warn("{}: exception while fetching latest offsets. ignored.", this, e); + p.setLatestOffset(-1L); // reset + } + + LOG.debug("{}: latest offset update for {} : {} (consumed offset {}, avg record size {})", + this, p.topicPartition, p.latestOffset, p.consumedOffset, p.avgRecordSize); + } + + LOG.debug("{}: backlog {}", this, getSplitBacklogBytes()); + } + + @Override + public Instant getWatermark() { + if (curRecord == null) { + LOG.warn("{}: getWatermark() : no records have been read yet.", name); + return initialWatermark; + } + + return source.watermarkFn.isPresent() ? + source.watermarkFn.get().apply(curRecord) : curTimestamp; + } + + @Override + public CheckpointMark getCheckpointMark() { + return new KafkaCheckpointMark(ImmutableList.copyOf(// avoid lazy (consumedOffset can change) + Lists.transform(partitionStates, + new Function() { + public PartitionMark apply(PartitionState p) { + return new PartitionMark(p.topicPartition, p.consumedOffset); + } + } + ))); + } + + @Override + public UnboundedSource, ?> getCurrentSource() { + return source; + } + + @Override + public KafkaRecord getCurrent() throws NoSuchElementException { + // should we delay updating consumed offset till this point? Mostly not required. + return curRecord; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return curTimestamp; + } + + + @Override + public long getSplitBacklogBytes() { + long backlogBytes = 0; + + for (PartitionState p : partitionStates) { + long pBacklog = p.approxBacklogInBytes(); + if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) { + return UnboundedReader.BACKLOG_UNKNOWN; + } + backlogBytes += pBacklog; + } + + return backlogBytes; + } + + @Override + public void close() throws IOException { + closed = true; + availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread. + consumer.wakeup(); + consumerPollThread.shutdown(); + offsetFetcherThread.shutdown(); + Closeables.close(offsetConsumer, true); + Closeables.close(consumer, true); + } + } +} diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecord.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecord.java new file mode 100644 index 0000000000..584b1b600f --- /dev/null +++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecord.java @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.kafka; + +import com.google.cloud.dataflow.sdk.values.KV; + +import java.io.Serializable; + +/** + * KafkaRecord contains key and value of the record as well as metadata for the record (topic name, + * partition id, and offset). + */ +public class KafkaRecord implements Serializable { + + private final String topic; + private final int partition; + private final long offset; + private final KV kv; + + public KafkaRecord( + String topic, + int partition, + long offset, + K key, + V value) { + this(topic, partition, offset, KV.of(key, value)); + } + + public KafkaRecord( + String topic, + int partition, + long offset, + KV kv) { + + this.topic = topic; + this.partition = partition; + this.offset = offset; + this.kv = kv; + } + + public String getTopic() { + return topic; + } + + public int getPartition() { + return partition; + } + + public long getOffset() { + return offset; + } + + public KV getKV() { + return kv; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof KafkaRecord) { + @SuppressWarnings("unchecked") + KafkaRecord other = (KafkaRecord) obj; + return topic.equals(other.topic) + && partition == other.partition + && offset == other.offset + && kv.equals(other.kv); + } else { + return false; + } + } +} diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecordCoder.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecordCoder.java new file mode 100644 index 0000000000..d9af1b5659 --- /dev/null +++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecordCoder.java @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.kafka; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderException; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.coders.StandardCoder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.coders.VarIntCoder; +import com.google.cloud.dataflow.sdk.coders.VarLongCoder; +import com.google.cloud.dataflow.sdk.util.PropertyNames; +import com.google.cloud.dataflow.sdk.values.KV; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + +/** + * {@link Coder} for {@link KafkaRecord}. + */ +public class KafkaRecordCoder extends StandardCoder> { + + private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); + private static final VarLongCoder longCoder = VarLongCoder.of(); + private static final VarIntCoder intCoder = VarIntCoder.of(); + + private final KvCoder kvCoder; + + @JsonCreator + public static KafkaRecordCoder of(@JsonProperty(PropertyNames.COMPONENT_ENCODINGS) + List> components) { + KvCoder kvCoder = KvCoder.of(components); + return of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()); + } + + public static KafkaRecordCoder of(Coder keyCoder, Coder valueCoder) { + return new KafkaRecordCoder(keyCoder, valueCoder); + } + + public KafkaRecordCoder(Coder keyCoder, Coder valueCoder) { + this.kvCoder = KvCoder.of(keyCoder, valueCoder); + } + + @Override + public void encode(KafkaRecord value, OutputStream outStream, Context context) + throws CoderException, IOException { + Context nested = context.nested(); + stringCoder.encode(value.getTopic(), outStream, nested); + intCoder.encode(value.getPartition(), outStream, nested); + longCoder.encode(value.getOffset(), outStream, nested); + kvCoder.encode(value.getKV(), outStream, nested); + } + + @Override + public KafkaRecord decode(InputStream inStream, Context context) + throws CoderException, IOException { + Context nested = context.nested(); + return new KafkaRecord( + stringCoder.decode(inStream, nested), + intCoder.decode(inStream, nested), + longCoder.decode(inStream, nested), + kvCoder.decode(inStream, nested)); + } + + @Override + public List> getCoderArguments() { + return kvCoder.getCoderArguments(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + kvCoder.verifyDeterministic(); + } + + @Override + public boolean isRegisterByteSizeObserverCheap(KafkaRecord value, Context context) { + return kvCoder.isRegisterByteSizeObserverCheap(value.getKV(), context); + //TODO : do we have to implement getEncodedSize()? + } + + @SuppressWarnings("unchecked") + @Override + public Object structuralValue(KafkaRecord value) throws Exception { + if (consistentWithEquals()) { + return value; + } else { + return new KafkaRecord( + value.getTopic(), + value.getPartition(), + value.getOffset(), + (KV) kvCoder.structuralValue(value.getKV())); + } + } + + @Override + public boolean consistentWithEquals() { + return kvCoder.consistentWithEquals(); + } +} diff --git a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java new file mode 100644 index 0000000000..b4bdc83ce9 --- /dev/null +++ b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java @@ -0,0 +1,378 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder; +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.UnboundedSource; +import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.RunnableOnService; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.Flatten; +import com.google.cloud.dataflow.sdk.transforms.Max; +import com.google.cloud.dataflow.sdk.transforms.Min; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; +import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; +import com.google.cloud.dataflow.sdk.transforms.Values; +import com.google.cloud.dataflow.sdk.util.CoderUtils; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionList; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.Nullable; + +/** + * Tests of {@link KafkaSource}. + */ +@RunWith(JUnit4.class) +public class KafkaIOTest { + /* + * The tests below borrow code and structure from CountingSourceTest. In addition verifies + * the reader interleaves the records from multiple partitions. + * + * Other tests to consider : + * - test KafkaRecordCoder + */ + + // Update mock consumer with records distributed among the given topics, each with given number + // of partitions. Records are assigned in round-robin order among the partitions. + private static MockConsumer mkMockConsumer( + List topics, int partitionsPerTopic, int numElements) { + + final List partitions = new ArrayList<>(); + final Map>> records = new HashMap<>(); + Map> partitionMap = new HashMap<>(); + + for (String topic : topics) { + List partIds = new ArrayList<>(partitionsPerTopic); + for (int i = 0; i < partitionsPerTopic; i++) { + partitions.add(new TopicPartition(topic, i)); + partIds.add(new PartitionInfo(topic, i, null, null, null)); + } + partitionMap.put(topic, partIds); + } + + int numPartitions = partitions.size(); + long[] offsets = new long[numPartitions]; + + for (int i = 0; i < numElements; i++) { + int pIdx = i % numPartitions; + TopicPartition tp = partitions.get(pIdx); + + if (!records.containsKey(tp)) { + records.put(tp, new ArrayList>()); + } + records.get(tp).add( + // Note: this interface has changed in 0.10. may get fixed before the release. + new ConsumerRecord( + tp.topic(), + tp.partition(), + offsets[pIdx]++, + null, // key + ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 byte record id. + } + + MockConsumer consumer = + new MockConsumer(OffsetResetStrategy.EARLIEST) { + // override assign() to add records that belong to the assigned partitions. + public void assign(List assigned) { + super.assign(assigned); + for (TopicPartition tp : assigned) { + for (ConsumerRecord r : records.get(tp)) { + addRecord(r); + } + updateBeginningOffsets(ImmutableMap.of(tp, 0L)); + updateEndOffsets(ImmutableMap.of(tp, (long)records.get(tp).size())); + seek(tp, 0); + } + } + }; + + for (String topic : topics) { + consumer.updatePartitions(topic, partitionMap.get(topic)); + } + + return consumer; + } + + private static class ConsumerFactoryFn + implements SerializableFunction, Consumer> { + private final List topics; + private final int partitionsPerTopic; + private final int numElements; + + public ConsumerFactoryFn(List topics, int partitionsPerTopic, int numElements) { + this.topics = topics; + this.partitionsPerTopic = partitionsPerTopic; + this.numElements = numElements; + } + + public Consumer apply(Map config) { + return mkMockConsumer(topics, partitionsPerTopic, numElements); + } + } + + /** + * Creates a consumer with two topics, with 5 partitions each. + * numElements are (round-robin) assigned all the 10 partitions. + */ + private static KafkaIO.TypedRead mkKafkaReadTransform( + int numElements, + @Nullable SerializableFunction, Instant> timestampFn) { + + List topics = ImmutableList.of("topic_a", "topic_b"); + + KafkaIO.Read reader = KafkaIO.read() + .withBootstrapServers("none") + .withTopics(topics) + .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 20 partitions + .withValueCoder(BigEndianLongCoder.of()) + .withMaxNumRecords(numElements); + + if (timestampFn != null) { + return reader.withTimestampFn(timestampFn); + } else { + return reader; + } + } + + private static class AssertMultipleOf implements SerializableFunction, Void> { + private final int num; + + public AssertMultipleOf(int num) { + this.num = num; + } + + @Override + public Void apply(Iterable values) { + for (Long v : values) { + assertEquals(0, v % num); + } + return null; + } + } + + public static void addCountingAsserts(PCollection input, long numElements) { + // Count == numElements + DataflowAssert + .thatSingleton(input.apply("Count", Count.globally())) + .isEqualTo(numElements); + // Unique count == numElements + DataflowAssert + .thatSingleton(input.apply(RemoveDuplicates.create()) + .apply("UniqueCount", Count.globally())) + .isEqualTo(numElements); + // Min == 0 + DataflowAssert + .thatSingleton(input.apply("Min", Min.globally())) + .isEqualTo(0L); + // Max == numElements-1 + DataflowAssert + .thatSingleton(input.apply("Max", Max.globally())) + .isEqualTo(numElements - 1); + } + + @Test + @Category(RunnableOnService.class) + public void testUnboundedSource() { + Pipeline p = TestPipeline.create(); + int numElements = 1000; + + PCollection input = p + .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) + .withoutMetadata()) + .apply(Values.create()); + + addCountingAsserts(input, numElements); + p.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testUnboundedSourceWithExplicitPartitions() { + Pipeline p = TestPipeline.create(); + int numElements = 1000; + + List topics = ImmutableList.of("test"); + + KafkaIO.TypedRead reader = KafkaIO.read() + .withBootstrapServers("none") + .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5))) + .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 10 partitions + .withValueCoder(BigEndianLongCoder.of()) + .withMaxNumRecords(numElements / 10); + + PCollection input = p + .apply(reader.withoutMetadata()) + .apply(Values.create()); + + // assert that every element is a multiple of 5. + DataflowAssert + .that(input) + .satisfies(new AssertMultipleOf(5)); + + DataflowAssert + .thatSingleton(input.apply(Count.globally())) + .isEqualTo(numElements / 10L); + + p.run(); + } + + private static class ElementValueDiff extends DoFn { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element() - c.timestamp().getMillis()); + } + } + + @Test + @Category(RunnableOnService.class) + public void testUnboundedSourceTimestamps() { + Pipeline p = TestPipeline.create(); + int numElements = 1000; + + PCollection input = p + .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata()) + .apply(Values.create()); + + addCountingAsserts(input, numElements); + + PCollection diffs = input + .apply("TimestampDiff", ParDo.of(new ElementValueDiff())) + .apply("RemoveDuplicateTimestamps", RemoveDuplicates.create()); + // This assert also confirms that diffs only has one unique value. + DataflowAssert.thatSingleton(diffs).isEqualTo(0L); + + p.run(); + } + + private static class RemoveKafkaMetadata extends DoFn, KV> { + @Override + public void processElement(ProcessContext ctx) throws Exception { + ctx.output(ctx.element().getKV()); + } + } + + @Test + @Category(RunnableOnService.class) + public void testUnboundedSourceSplits() throws Exception { + Pipeline p = TestPipeline.create(); + int numElements = 1000; + int numSplits = 10; + + UnboundedSource, ?> initial = + mkKafkaReadTransform(numElements, null).makeSource(); + List, ?>> splits = + initial.generateInitialSplits(numSplits, p.getOptions()); + assertEquals("Expected exact splitting", numSplits, splits.size()); + + long elementsPerSplit = numElements / numSplits; + assertEquals("Expected even splits", numElements, elementsPerSplit * numSplits); + PCollectionList pcollections = PCollectionList.empty(p); + for (int i = 0; i < splits.size(); ++i) { + pcollections = pcollections.and( + p.apply("split" + i, Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit)) + .apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata())) + .apply("collection " + i, Values.create())); + } + PCollection input = pcollections.apply(Flatten.pCollections()); + + addCountingAsserts(input, numElements); + p.run(); + } + + /** + * A timestamp function that uses the given value as the timestamp. + */ + private static class ValueAsTimestampFn + implements SerializableFunction, Instant> { + @Override + public Instant apply(KV input) { + return new Instant(input.getValue()); + } + } + + @Test + public void testUnboundedSourceCheckpointMark() throws Exception { + int numElements = 85; // 85 to make sure some partitions have more records than other. + + // create a single split: + UnboundedSource, KafkaCheckpointMark> source = + mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) + .makeSource() + .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create()) + .get(0); + + UnboundedReader> reader = source.createReader(null, null); + final int numToSkip = 3; + // advance once: + assertTrue(reader.start()); + + // Advance the source numToSkip-1 elements and manually save state. + for (long l = 0; l < numToSkip - 1; ++l) { + assertTrue(reader.advance()); + } + + // Confirm that we get the expected element in sequence before checkpointing. + + assertEquals(numToSkip - 1, (long) reader.getCurrent().getKV().getValue()); + assertEquals(numToSkip - 1, reader.getCurrentTimestamp().getMillis()); + + // Checkpoint and restart, and confirm that the source continues correctly. + KafkaCheckpointMark mark = CoderUtils.clone( + source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark()); + reader = source.createReader(null, mark); + assertTrue(reader.start()); + + // Confirm that we get the next elements in sequence. + // This also confirms that Reader interleaves records from each partitions by the reader. + for (int i = numToSkip; i < numElements; i++) { + assertEquals(i, (long) reader.getCurrent().getKV().getValue()); + assertEquals(i, reader.getCurrentTimestamp().getMillis()); + reader.advance(); + } + } +}