From 956e10b8d11518df160a221a4d83a48e25a1cd1b Mon Sep 17 00:00:00 2001 From: Kevin Sookocheff Date: Wed, 20 Jan 2016 09:50:03 -0600 Subject: [PATCH 1/2] Kafka research spike. --- contrib/kafka/AUTHORS.md | 6 + contrib/kafka/README.md | 4 + .../contrib/kafka/KafkaStreamingRouter.java | 137 ++++++++++ .../kafka/coders/ConsumerRecordCoder.java | 52 ++++ .../contrib/kafka/io/KafkaCheckpoint.java | 104 ++++++++ .../contrib/kafka/io/KafkaProducer.java | 212 +++++++++++++++ .../contrib/kafka/io/KafkaSource.java | 251 ++++++++++++++++++ .../kafka/coders/ConsumerRecordCoderTest.java | 65 +++++ .../contrib/kafka/io/KafkaCheckpointTest.java | 66 +++++ .../contrib/kafka/io/KafkaProducerTest.java | 142 ++++++++++ 10 files changed, 1039 insertions(+) create mode 100644 contrib/kafka/AUTHORS.md create mode 100644 contrib/kafka/README.md create mode 100644 contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaStreamingRouter.java create mode 100644 contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/coders/ConsumerRecordCoder.java create mode 100644 contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaCheckpoint.java create mode 100644 contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaProducer.java create mode 100644 contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaSource.java create mode 100644 contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/coders/ConsumerRecordCoderTest.java create mode 100644 contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaCheckpointTest.java create mode 100644 contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaProducerTest.java diff --git a/contrib/kafka/AUTHORS.md b/contrib/kafka/AUTHORS.md new file mode 100644 index 0000000000..64245a0440 --- /dev/null +++ b/contrib/kafka/AUTHORS.md @@ -0,0 +1,6 @@ +# Authors of 'kafka' module + +The following is the official list of authors for copyright purposes of this community-contributed module. + + Workiva + Kevin Sookocheff, kevin [dot] sookocheff [at] workiva [dot] com diff --git a/contrib/kafka/README.md b/contrib/kafka/README.md new file mode 100644 index 0000000000..d30918bae1 --- /dev/null +++ b/contrib/kafka/README.md @@ -0,0 +1,4 @@ +# Kafka module + +This library provides Dataflow sources for Kafka. It is a work in progress and +is not considered ready for production use. diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaStreamingRouter.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaStreamingRouter.java new file mode 100644 index 0000000000..b7838ec455 --- /dev/null +++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaStreamingRouter.java @@ -0,0 +1,137 @@ +/** + * Sample job that reads from a Kafka topic and submits to BigQuery. + * This job will form the basis of a streaming router that pulls messages + * from Kafka and writes them to downstream systems. + */ + +package com.google.cloud.dataflow.contrib.kafka; + +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.dataflow.sdk.coders.CoderRegistry; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.BigQueryIO; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.workiva.cloud.dataflow.coders.ConsumerRecordCoder; +import com.workiva.cloud.dataflow.io.KafkaSource; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.joda.time.Duration; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + + +/** + * Echo messages from Kafka to BigQuery. + */ +public class KafkaStreamingRouter { + + static final int WINDOW_SIZE = 1; // Default window duration in minutes + + /** + * Prepares the data for writing to BigQuery by building a TableRow object. + */ + static class FormatMessageFn extends DoFn, TableRow> { + private static final long serialVersionUID = 1L; + + @Override + public void processElement(ProcessContext c) { + TableRow row = new TableRow() + .set("topic", c.element().topic()) + .set("offset", c.element().offset()) + .set("partition", c.element().partition()) + .set("value", c.element().value()); + + c.output(row); + } + } + + /** A PTransform DoFn that combines keys and values to one string. */ + static class ConsumerRecordToRowConverter + extends PTransform>, PCollection> { + private static final long serialVersionUID = 1L; + + @Override + public PCollection apply(PCollection> messages) { + PCollection results = messages.apply( + ParDo.of(new FormatMessageFn())); + + return results; + } + + } + + /** + * Options supported by {@link KafkaStreamingRouterOptions}. + * + *

Inherits standard configuration options. + */ + public static interface KafkaStreamingRouterOptions extends DataflowPipelineOptions { + @Description("Kafka servers in the cluster.") + @Default.String("127.0.0.0:9092") + String getBootstrapServers(); + void setBootstrapServers(String value); + + @Description("The group id of these consumers.") + @Default.String("test") + String getGroupId(); + void setGroupId(String value); + + @Description("The topic to read.") + @Default.String("test") + String getTopic(); + void setTopic(String value); + } + + public static void main(String[] args) throws IOException { + KafkaStreamingRouterOptions options = PipelineOptionsFactory.fromArgs(args) + .withValidation() + .as(KafkaStreamingRouterOptions.class); + options.setProject("cloud-project-namew"); + options.setStreaming(true); + options.setNumWorkers(3); + options.setMaxNumWorkers(3); + + String tableSpec = new StringBuilder() + .append(options.getProject()).append(":") + .append("kafkastreamingrouter").append(".") + .append("output") + .toString(); + + List fields = new ArrayList<>(); + fields.add(new TableFieldSchema().setName("topic").setType("STRING")); + fields.add(new TableFieldSchema().setName("offset").setType("INTEGER")); + fields.add(new TableFieldSchema().setName("partition").setType("INTEGER")); + fields.add(new TableFieldSchema().setName("key").setType("STRING")); + fields.add(new TableFieldSchema().setName("value").setType("STRING")); + TableSchema schema = new TableSchema().setFields(fields); + + Pipeline pipeline = Pipeline.create(options); + + CoderRegistry cr = pipeline.getCoderRegistry(); + cr.registerCoder(ConsumerRecord.class, ConsumerRecordCoder.class); + + pipeline + .apply(KafkaSource.readFrom(options.getTopic(), options.getBootstrapServers(), options.getGroupId())) + .apply(Window.>into(FixedWindows.of( + Duration.standardMinutes(WINDOW_SIZE)))) + .apply(new ConsumerRecordToRowConverter()) + .apply(BigQueryIO.Write.to(tableSpec) + .withSchema(schema) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); + + pipeline.run(); + } +} diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/coders/ConsumerRecordCoder.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/coders/ConsumerRecordCoder.java new file mode 100644 index 0000000000..636a4d7d13 --- /dev/null +++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/coders/ConsumerRecordCoder.java @@ -0,0 +1,52 @@ +package com.google.cloud.dataflow.contrib.kafka.coders; + +import com.google.cloud.dataflow.sdk.coders.CustomCoder; +import com.google.common.base.Charsets; +import com.google.common.io.CharStreams; +import com.google.gson.Gson; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.codehaus.jackson.annotate.JsonCreator; + +/** + * A {@code ConsumerRecordCoder} encodes Kafka ConsumerRecord objects. + */ +public class ConsumerRecordCoder extends CustomCoder> { + private static final long serialVersionUID = 1L; + + + @JsonCreator + public static ConsumerRecordCoder of() { + return INSTANCE; + } + + private static final ConsumerRecordCoder INSTANCE = new ConsumerRecordCoder(); + + private ConsumerRecordCoder() {} + + @Override + public void encode(ConsumerRecord value, OutputStream outStream, Context context) + throws IOException { + Gson gson = new Gson(); + String json = gson.toJson(value); + outStream.write(json.getBytes(Charsets.UTF_8)); + } + + @Override + public ConsumerRecord decode(InputStream inStream, Context context) + throws IOException { + String json; + try (InputStreamReader reader = new InputStreamReader(inStream, Charsets.UTF_8)){ + json = CharStreams.toString(reader); + } + Gson gson = new Gson(); + @SuppressWarnings("unchecked") + ConsumerRecord record = gson.fromJson(json, ConsumerRecord.class); + return record; + } +} diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaCheckpoint.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaCheckpoint.java new file mode 100644 index 0000000000..e662476dbd --- /dev/null +++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaCheckpoint.java @@ -0,0 +1,104 @@ +package com.google.cloud.dataflow.contrib.kafka.io; + +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.cloud.dataflow.sdk.io.UnboundedSource; +import com.google.common.collect.Maps; + +/** + * A marker representing the progress and state of the {@link UnboundedReader}. + * + * Starts a separate thread to read messages from Kafka to populate a {@link + * LinkedBlockingQueue}. As messages are read, the furthest processed offset is + * tracked. When messages have been durably committed by Dataflow, the furthest + * offset read is written to Kafka. + * + * Will return duplicate messages. + */ +public class KafkaCheckpoint implements UnboundedSource.CheckpointMark { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaCheckpoint.class); + + private final LinkedBlockingQueue> recordQueue; + private final ConcurrentMap furthestCommitted; + private Map furthestRead; + private ConsumerRecord current; + private KafkaProducer producer; + + KafkaCheckpoint(String bootstrapServers, String groupId, String topic) { + recordQueue = new LinkedBlockingQueue>(); + furthestCommitted = Maps.newConcurrentMap(); + furthestRead = Maps.newHashMap(); + + KafkaProducer.Builder builder = new KafkaProducer.Builder(); + KafkaProducer producer = builder + .withBootstrapServers(bootstrapServers) + .withGroupId(groupId) + .withTopic(topic) + .withQueue(recordQueue) + .withCommitted(furthestCommitted) + .build(); + + new Thread(producer).start(); + } + + /** + * Commit the furthest read offsets to the Kafka cluster. + */ + @Override + public void finalizeCheckpoint() { + furthestCommitted.clear(); + furthestCommitted.putAll(furthestRead); + + producer.commit(); + + furthestRead = Maps.newHashMap(); + } + + /** + * Advance to the next available record. Returns true if a record exists. + * False otherwise. + */ + public boolean advance() { + current = this.recordQueue.poll(); + if (current == null) { + return false; + } + + TopicPartition p = new TopicPartition(current.topic(), current.partition()); + OffsetAndMetadata offset = new OffsetAndMetadata(current.offset() + 1); + furthestRead.put(p, offset); + + return true; + } + + /** + * Return the current record. + */ + public ConsumerRecord current() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + + LOG.info("Current Record: topic={}, partition={}, offset={}, value={}", + current.topic(), current.partition(), current.offset(), current.value()); + + return current; + } + + /** + * Shutdown the producer thread. + */ + public void shutdown() { + producer.shutdown(); + } +} diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaProducer.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaProducer.java new file mode 100644 index 0000000000..ce392d35a8 --- /dev/null +++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaProducer.java @@ -0,0 +1,212 @@ +package com.google.cloud.dataflow.contrib.kafka.io; + +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.ConsumerWakeupException; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * Produce messages from Kafka for consumption by the KafkaReader. + */ +public class KafkaProducer implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaCheckpoint.class); + private static final Integer DEFAULT_SESSION_TIMEOUT = 30000; + + private final LinkedBlockingQueue> queue; + private final ConcurrentMap committed; + private final AtomicBoolean shutdown = new AtomicBoolean(false); + private final AtomicBoolean commit = new AtomicBoolean(false); + private final KafkaConsumer consumer; + + private KafkaProducer( + String bootstrapServers, + String groupId, + String topic, + Integer sessionTimeout, + LinkedBlockingQueue> queue, + ConcurrentMap committed, + KafkaConsumer consumer) { + + if (bootstrapServers == null || bootstrapServers.isEmpty()) { + throw new IllegalArgumentException("bootstrapServers can not be null."); + } + + if (groupId == null || groupId.isEmpty()) { + throw new IllegalArgumentException("groupId is required."); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("topic is required."); + } + + if (queue == null) { + throw new IllegalArgumentException("queue is required."); + } + + if (committed == null) { + throw new IllegalArgumentException("committed is required."); + } + + if (sessionTimeout == null) { + sessionTimeout = DEFAULT_SESSION_TIMEOUT; + } + + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServers); + props.put("group.id", groupId); + props.put("session.timeout.ms", Integer.toString(sessionTimeout)); + props.put("enable.auto.commit", "false"); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + this.queue = queue; + this.committed = committed; + + if (consumer == null) { + this.consumer = new KafkaConsumer(props); + } else { + this.consumer = consumer; + } + this.consumer.subscribe(Lists.newArrayList(topic)); + } + + public static class Builder + { + private String topic; + private String groupId; + private Integer sessionTimeout; + private String bootstrapServers; + private LinkedBlockingQueue> queue; + private ConcurrentMap committed; + private KafkaConsumer consumer; + + public Builder withTopic(final String topic) { + this.topic = topic; + return this; + } + + public Builder withGroupId(final String groupId) { + this.groupId = groupId; + return this; + } + + public Builder withSessionTimeout(final Integer sessionTimeout) { + this.sessionTimeout = sessionTimeout; + return this; + } + + public Builder withBootstrapServers(final String bootstrapServers) + { + this.bootstrapServers = bootstrapServers; + return this; + } + + public Builder withQueue(final LinkedBlockingQueue> queue) + { + this.queue = queue; + return this; + } + + public Builder withCommitted(final ConcurrentMap committed) + { + this.committed = committed; + return this; + } + + public Builder withConsumer(final KafkaConsumer consumer) + { + this.consumer = consumer; + return this; + } + + public KafkaProducer build() + { + return new KafkaProducer( + bootstrapServers, + groupId, + topic, + sessionTimeout, + queue, + committed, + consumer); + } + } + + /** + * Poll for messages from Kafka and add to shared queue. Set commit to true to + * commit all offsets specified in committed shared map. + * + */ + public void run() { + try { + while (!shutdown.get()) { + ConsumerRecords records = consumer.poll(1000); + for (ConsumerRecord record : records) { + try { + queue.put(record); + } catch (InterruptedException e) { + LOG.error(Throwables.getStackTraceAsString(e)); + } + } + + // Sync all topics with their furthest read offset + if (commit.get()) { + System.out.println(committed); + + // Remove any committed partitions that we are no longer assigned to. + // if we are no longer assigned, a consumer rebalance has shifted + // those partitions to another consumer. Let the other consumer handle + // those messages. + Sets.SetView notAssigned = + Sets.difference(committed.keySet(), consumer.assignment()); + committed.keySet().removeAll(notAssigned); + + for (Map.Entry entry : committed.entrySet()) { + LOG.info("Committing: topic={}, partition={}, offset={}", entry.getKey().topic(), + entry.getKey().partition(), entry.getValue().offset()); + } + + consumer.commitSync(committed); + commit.set(false); + } + } + } catch (ConsumerWakeupException e) { + // Ignore exception if closing + if (!shutdown.get()) throw e; + } finally { + consumer.close(); + } + } + + /** + * Commit the furthest read offset of each partition. If we are no longer + * subscribed to a partition, the offset is not committed. Another consumer + * will process these offsets. Duplicate data may result. + */ + public synchronized void commit() { + commit.set(true); + } + + /** + * Shutdown hook. Closes the consumer. + */ + public synchronized void shutdown() { + shutdown.set(true); + consumer.wakeup(); + } +} diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaSource.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaSource.java new file mode 100644 index 0000000000..aa1760bd0d --- /dev/null +++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaSource.java @@ -0,0 +1,251 @@ +package com.google.cloud.dataflow.contrib.kafka.io; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderException; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.UnboundedSource; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.util.CoderUtils; +import com.google.common.base.Throwables; +import com.workiva.cloud.dataflow.coders.ConsumerRecordCoder; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + + +/** + * An {@code UnboundedSource} for reading a Kafka topic. Requires Kafka 0.9 or + * later. + * + *

To read a {@link com.google.cloud.dataflow.sdk.values.PCollection} of + * ConsumerRecords from one Kafka topic, use the + * {@link KafkaSource#readFrom} method as a convenience + * that returns a read transform. For example: + * + *

+ * {@code
+ * PCollection> records =
+ * KafkaSource.readFrom(topic, bootstrapServers, groupId)
+ * }
+ * 
+ * + * Limitations: Currently only supports String encoding of Consumer Records. + */ +public class KafkaSource extends UnboundedSource, KafkaCheckpoint> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); + + private static final long serialVersionUID = 0L; + + private final String topic; + private final String bootstrapServers; + private final String groupId; + private final boolean dedup; + + public static Read.Unbounded> readFrom(String topic, + String bootstrapServers, String groupId) { + return Read.from(new KafkaSource(topic, bootstrapServers, groupId, false)); + } + + /** + * Enable deduplication of records. + */ + public KafkaSource withDedup() { + return new KafkaSource(topic, bootstrapServers, groupId, true); + } + + private KafkaSource(String topic, String bootstrapServers, String groupId, boolean dedup) { + this.topic = topic; + this.bootstrapServers = bootstrapServers; + this.groupId = groupId; + this.dedup = dedup; + } + + + /** + * Returns a list of {@code KafkaSource} objects representing the instances of this source + * that should be used when executing the workflow. + * + *

Kafka automatically partitions the data among readers. In this case, + * {@code n} identical replicas of the top-level source can be returned. + */ + @Override + public List generateInitialSplits(int desiredNumSplits, PipelineOptions options) { + List splits = new ArrayList<>(); + for (int i = 0; i < desiredNumSplits; i++) { + splits.add(new KafkaSource(topic, bootstrapServers, groupId, dedup)); + } + return splits; + } + + /** + * KafkaCheckpoint does not need to be durably committed. Can return null. + */ + @Override + public Coder getCheckpointMarkCoder() { + return null; + } + + /** + * Returns whether this source requires explicit deduping. + * + *

The KafkaCheckpoint will return duplicate records on error. If you + * require deduping, use withDedup() when constructing the KafkaSource. + */ + @Override + public boolean requiresDeduping() { + return dedup; + } + + + /** + * A {@code Reader} that reads unbounded input from a single Kafka topic. + */ + private class KafkaSourceReader extends UnboundedReader> { + + private final KafkaCheckpoint checkpoint; + + KafkaSourceReader(KafkaCheckpoint checkpoint, + String bootstrapServers, String groupId, String topic) { + if (checkpoint == null) { + this.checkpoint = new KafkaCheckpoint(bootstrapServers, groupId, topic); + } else { + this.checkpoint = checkpoint; + } + } + + /** + * Initializes the reader and advances the reader to the first record. + */ + @Override + public boolean start() { + return this.advance(); + } + + /** + * Advances the reader to the next valid record. + * + *

Returns {@code true} if a record was read, {@code false} if there is no more input + * available. Future calls to {@link #advance} may return {@code true} once more data is + * available. + */ + @Override + public boolean advance() { + return this.checkpoint.advance(); + } + + /** + * Returns the data record at the current position, last read by calling start or advance. + */ + @Override + public ConsumerRecord getCurrent() throws NoSuchElementException { + return this.checkpoint.current(); + } + + /** + * Returns a unique identifier for the current record based + * on the current offset, topic and partition. + */ + @Override + public byte[] getCurrentRecordId() throws NoSuchElementException { + try { + ConsumerRecord record = getCurrent(); + if (record != null) { + return CoderUtils.encodeToByteArray(StringUtf8Coder.of(), record.toString()); + } else { + return new byte[0]; + } + } catch (CoderException e) { + LOG.error(Throwables.getStackTraceAsString(e)); + return new byte[0]; + } + } + + /** + * Returns a lower bound on timestamps of future elements read by this + * reader. + * + * Kafka records do not have a natural timestamp so use clock time instead. + */ + @Override + public Instant getWatermark() { + DateTime dt = new DateTime(DateTimeZone.UTC); + return dt.toInstant(); + } + + /** + * Returns the timestamp associated with the current data item. + * + * Kafka records do not have a natural timestamp so min value instead. + */ + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + ConsumerRecord record = getCurrent(); + if (record == null) { + throw new NoSuchElementException(); + } + return BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + /** + * Returns a {@link KafkaCheckpoint} representing the progress of this {@code UnboundedReader}. + */ + @Override + public CheckpointMark getCheckpointMark() { + return checkpoint; + } + + /** + * Safely close the reader. + */ + @Override + public void close() { + this.checkpoint.shutdown(); + } + + /** + * Return an instance of the current KafkaSource. + */ + @Override + public KafkaSource getCurrentSource() { + return KafkaSource.this; + } + } + + /** + * Create a new UnboundedSource.UnboundedReader to read from this source, + * resuming from the given checkpoint if present. + */ + @Override + public KafkaSourceReader createReader(PipelineOptions options, + @Nullable KafkaCheckpoint checkpoint) { + + return new KafkaSourceReader(checkpoint, this.bootstrapServers, this.groupId, this.topic); + } + + /** + * Checks that this source is valid, before it can be used in a pipeline. + */ + @Override + public void validate() {} + + /** + * Returns the default Coder to use for the data read from this source. + */ + @Override + public Coder> getDefaultOutputCoder() { + return ConsumerRecordCoder.of(); + } +} diff --git a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/coders/ConsumerRecordCoderTest.java b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/coders/ConsumerRecordCoderTest.java new file mode 100644 index 0000000000..dbba71b24e --- /dev/null +++ b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/coders/ConsumerRecordCoderTest.java @@ -0,0 +1,65 @@ +package com.workiva.cloud.dataflow.coders; + +import static org.junit.Assert.assertEquals; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderException; +import com.google.cloud.dataflow.sdk.util.Serializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test case for {@link ConsumerRecordCoder}. + */ +@RunWith(JUnit4.class) +public class ConsumerRecordCoderTest { + + @Test + public void testDecodeEncodeEqual() throws Exception { + ConsumerRecordCoder coder = ConsumerRecordCoder.of(); + + ConsumerRecord record = + new ConsumerRecord("test", 10, 100, "key", "value"); + + for (Coder.Context context : Arrays.asList(Coder.Context.OUTER, Coder.Context.NESTED)) { + ConsumerRecord value = decodeEncode(coder, context, record); + assertEquals(value.topic(), record.topic()); + assertEquals(value.partition(), record.partition()); + assertEquals(value.offset(), record.offset()); + assertEquals(value.key(), record.key()); + assertEquals(value.value(), record.value()); + } + } + + private static byte[] encode( + Coder coder, Coder.Context context, T value) throws CoderException, IOException { + @SuppressWarnings("unchecked") + Coder deserializedCoder = Serializer.deserialize(coder.asCloudObject(), Coder.class); + + ByteArrayOutputStream os = new ByteArrayOutputStream(); + deserializedCoder.encode(value, os, context); + return os.toByteArray(); + } + + private static T decode( + Coder coder, Coder.Context context, byte[] bytes) throws CoderException, IOException { + @SuppressWarnings("unchecked") + Coder deserializedCoder = Serializer.deserialize(coder.asCloudObject(), Coder.class); + + ByteArrayInputStream is = new ByteArrayInputStream(bytes); + return deserializedCoder.decode(is, context); + } + + private static T decodeEncode(Coder coder, Coder.Context context, T value) + throws CoderException, IOException { + return decode(coder, context, encode(coder, context, value)); + } +} diff --git a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaCheckpointTest.java b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaCheckpointTest.java new file mode 100644 index 0000000000..ee96c146e9 --- /dev/null +++ b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaCheckpointTest.java @@ -0,0 +1,66 @@ +package com.workiva.cloud.dataflow.io; + +import static org.junit.Assert.assertEquals; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderException; +import com.google.cloud.dataflow.sdk.util.Serializer; +import com.workiva.cloud.dataflow.coders.ConsumerRecordCoder; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test case for {@link KafkaCheckpoint}. + */ +@RunWith(JUnit4.class) +public class KafkaCheckpointTest { + + @Test + public void testDecodeEncodeEqual() throws Exception { + ConsumerRecordCoder coder = ConsumerRecordCoder.of(); + + ConsumerRecord record = + new ConsumerRecord("test", 10, 100, "key", "value"); + + for (Coder.Context context : Arrays.asList(Coder.Context.OUTER, Coder.Context.NESTED)) { + ConsumerRecord value = decodeEncode(coder, context, record); + assertEquals(value.topic(), record.topic()); + assertEquals(value.partition(), record.partition()); + assertEquals(value.offset(), record.offset()); + assertEquals(value.key(), record.key()); + assertEquals(value.value(), record.value()); + } + } + + private static byte[] encode( + Coder coder, Coder.Context context, T value) throws CoderException, IOException { + @SuppressWarnings("unchecked") + Coder deserializedCoder = Serializer.deserialize(coder.asCloudObject(), Coder.class); + + ByteArrayOutputStream os = new ByteArrayOutputStream(); + deserializedCoder.encode(value, os, context); + return os.toByteArray(); + } + + private static T decode( + Coder coder, Coder.Context context, byte[] bytes) throws CoderException, IOException { + @SuppressWarnings("unchecked") + Coder deserializedCoder = Serializer.deserialize(coder.asCloudObject(), Coder.class); + + ByteArrayInputStream is = new ByteArrayInputStream(bytes); + return deserializedCoder.decode(is, context); + } + + private static T decodeEncode(Coder coder, Coder.Context context, T value) + throws CoderException, IOException { + return decode(coder, context, encode(coder, context, value)); + } +} diff --git a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaProducerTest.java b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaProducerTest.java new file mode 100644 index 0000000000..b8d445dc5a --- /dev/null +++ b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/io/KafkaProducerTest.java @@ -0,0 +1,142 @@ + +package com.workiva.cloud.dataflow.io; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertEquals; + + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import com.google.common.collect.Sets; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test case for {@link KafkaProducer}. + */ +@RunWith(JUnit4.class) +public class KafkaProducerTest { + + @Test(expected=IllegalArgumentException.class) + public void builder_requiresBootstrapId() throws Exception { + KafkaProducer.Builder builder = new KafkaProducer.Builder(); + builder.withGroupId("test").withTopic("test").build(); + } + + @Test(expected=IllegalArgumentException.class) + public void builder_requiresOneBootstrapId() throws Exception { + KafkaProducer.Builder builder = new KafkaProducer.Builder(); + builder.withBootstrapServers("").withGroupId("test").withTopic("test").build(); + } + + @Test(expected=IllegalArgumentException.class) + public void builder_requiresGroupId() throws Exception { + KafkaProducer.Builder builder = new KafkaProducer.Builder(); + builder.withBootstrapServers("localhost:9092").withTopic("test").build(); + } + + @Test(expected=IllegalArgumentException.class) + public void builder_requiresTopic() throws Exception { + KafkaProducer.Builder builder = new KafkaProducer.Builder(); + builder.withBootstrapServers("localhost:9092").withGroupId("test").build(); + } + + @Test(expected=IllegalArgumentException.class) + public void builder_requiresQueue() throws Exception { + KafkaProducer.Builder builder = new KafkaProducer.Builder(); + builder.withBootstrapServers("localhost:9092") + .withGroupId("test").withTopic("test").build(); + } + + @Test(expected=IllegalArgumentException.class) + public void builder_requiresCommitted() throws Exception { + KafkaProducer.Builder builder = new KafkaProducer.Builder(); + builder.withBootstrapServers("localhost:9092") + .withGroupId("test") + .withTopic("test") + .withQueue(Queues.newLinkedBlockingQueue()) + .build(); + } + + @Test + public void builder_constructsProducer() throws Exception { + KafkaProducer.Builder builder = new KafkaProducer.Builder(); + KafkaProducer producer = builder + .withBootstrapServers("localhost:9092") + .withGroupId("test") + .withTopic("test") + .withQueue(Queues.newLinkedBlockingQueue()) + .withCommitted(Maps.newConcurrentMap()) + .build(); + + assertNotNull(producer); + } + + + @Test + public void producesRecordsToSharedQueue() throws Exception { + Map>> recordMap = Maps.newHashMap(); + List> recordList = Lists.newArrayList(); + recordList.add(new ConsumerRecord("topic", 10, 100, "key", "value")); + recordMap.put(new TopicPartition("topic", 0), recordList); + + @SuppressWarnings("unchecked") + KafkaConsumer consumer = mock(KafkaConsumer.class); + + when(consumer.poll(1000)) + .thenReturn(new ConsumerRecords(recordMap)) + .thenReturn(ConsumerRecords.empty()); + + when(consumer.assignment()).thenReturn(Sets.newHashSet(new TopicPartition("topic", 1))); + + LinkedBlockingQueue> queue = Queues.newLinkedBlockingQueue(); + ConcurrentMap committed = Maps.newConcurrentMap(); + committed.put(new TopicPartition("topic", 0), new OffsetAndMetadata(9, "")); + + KafkaProducer.Builder builder = new KafkaProducer.Builder(); + KafkaProducer producer = builder + .withBootstrapServers("localhost:9092") + .withGroupId("test") + .withTopic("test") + .withQueue(queue) + .withCommitted(committed) + .withConsumer(consumer) + .build(); + + new Thread(producer).start(); + + producer.commit(); + + ConsumerRecord record = queue.take(); + + // Assigned a different partition, will commit nothing + verify(consumer).commitSync(Maps.newConcurrentMap()); + + assertEquals(record.topic(), "topic"); + assertEquals(record.partition(), 10); + assertEquals(record.offset(), 100); + assertEquals(record.key(), "key"); + assertEquals(record.value(), "value"); + + producer.shutdown(); + + verify(consumer).close(); + } +} From e5a9893d5bcbd27ecfd819f33d8182c1821def58 Mon Sep 17 00:00:00 2001 From: Kevin Sookocheff Date: Wed, 20 Jan 2016 09:52:46 -0600 Subject: [PATCH 2/2] Add Kafka version dependency. --- contrib/kafka/README.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/contrib/kafka/README.md b/contrib/kafka/README.md index d30918bae1..273c2ecc2b 100644 --- a/contrib/kafka/README.md +++ b/contrib/kafka/README.md @@ -1,4 +1,7 @@ # Kafka module -This library provides Dataflow sources for Kafka. It is a work in progress and +This library provides a streaming Dataflow source for Kafka. It is a work in progress and is not considered ready for production use. + +Dependencies: +Kafka >= 0.9.0.0