Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions contrib/kafka/AUTHORS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Authors of 'kafka' module

The following is the official list of authors for copyright purposes of this community-contributed module.

Workiva
Kevin Sookocheff, kevin [dot] sookocheff [at] workiva [dot] com
7 changes: 7 additions & 0 deletions contrib/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Kafka module

This library provides a streaming Dataflow source for Kafka. It is a work in progress and
is not considered ready for production use.

Dependencies:
Kafka >= 0.9.0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/**
* Sample job that reads from a Kafka topic and submits to BigQuery.
* This job will form the basis of a streaming router that pulls messages
* from Kafka and writes them to downstream systems.
*/

package com.google.cloud.dataflow.contrib.kafka;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.workiva.cloud.dataflow.coders.ConsumerRecordCoder;
import com.workiva.cloud.dataflow.io.KafkaSource;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.joda.time.Duration;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


/**
* Echo messages from Kafka to BigQuery.
*/
public class KafkaStreamingRouter {

static final int WINDOW_SIZE = 1; // Default window duration in minutes

/**
* Prepares the data for writing to BigQuery by building a TableRow object.
*/
static class FormatMessageFn extends DoFn<ConsumerRecord<String, String>, TableRow> {
private static final long serialVersionUID = 1L;

@Override
public void processElement(ProcessContext c) {
TableRow row = new TableRow()
.set("topic", c.element().topic())
.set("offset", c.element().offset())
.set("partition", c.element().partition())
.set("value", c.element().value());

c.output(row);
}
}

/** A PTransform DoFn that combines keys and values to one string. */
static class ConsumerRecordToRowConverter
extends PTransform<PCollection<ConsumerRecord<String, String>>, PCollection<TableRow>> {
private static final long serialVersionUID = 1L;

@Override
public PCollection<TableRow> apply(PCollection<ConsumerRecord<String, String>> messages) {
PCollection<TableRow> results = messages.apply(
ParDo.of(new FormatMessageFn()));

return results;
}

}

/**
* Options supported by {@link KafkaStreamingRouterOptions}.
*
* <p> Inherits standard configuration options.
*/
public static interface KafkaStreamingRouterOptions extends DataflowPipelineOptions {
@Description("Kafka servers in the cluster.")
@Default.String("127.0.0.0:9092")
String getBootstrapServers();
void setBootstrapServers(String value);

@Description("The group id of these consumers.")
@Default.String("test")
String getGroupId();
void setGroupId(String value);

@Description("The topic to read.")
@Default.String("test")
String getTopic();
void setTopic(String value);
}

public static void main(String[] args) throws IOException {
KafkaStreamingRouterOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(KafkaStreamingRouterOptions.class);
options.setProject("cloud-project-namew");
options.setStreaming(true);
options.setNumWorkers(3);
options.setMaxNumWorkers(3);

String tableSpec = new StringBuilder()
.append(options.getProject()).append(":")
.append("kafkastreamingrouter").append(".")
.append("output")
.toString();

List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("topic").setType("STRING"));
fields.add(new TableFieldSchema().setName("offset").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("partition").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("key").setType("STRING"));
fields.add(new TableFieldSchema().setName("value").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);

Pipeline pipeline = Pipeline.create(options);

CoderRegistry cr = pipeline.getCoderRegistry();
cr.registerCoder(ConsumerRecord.class, ConsumerRecordCoder.class);

pipeline
.apply(KafkaSource.readFrom(options.getTopic(), options.getBootstrapServers(), options.getGroupId()))
.apply(Window.<ConsumerRecord<String,String>>into(FixedWindows.of(
Duration.standardMinutes(WINDOW_SIZE))))
.apply(new ConsumerRecordToRowConverter())
.apply(BigQueryIO.Write.to(tableSpec)
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

pipeline.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.google.cloud.dataflow.contrib.kafka.coders;

import com.google.cloud.dataflow.sdk.coders.CustomCoder;
import com.google.common.base.Charsets;
import com.google.common.io.CharStreams;
import com.google.gson.Gson;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.codehaus.jackson.annotate.JsonCreator;

/**
* A {@code ConsumerRecordCoder} encodes Kafka ConsumerRecord objects.
*/
public class ConsumerRecordCoder extends CustomCoder<ConsumerRecord<String, String>> {
private static final long serialVersionUID = 1L;


@JsonCreator
public static ConsumerRecordCoder of() {
return INSTANCE;
}

private static final ConsumerRecordCoder INSTANCE = new ConsumerRecordCoder();

private ConsumerRecordCoder() {}

@Override
public void encode(ConsumerRecord<String, String> value, OutputStream outStream, Context context)
throws IOException {
Gson gson = new Gson();
String json = gson.toJson(value);
outStream.write(json.getBytes(Charsets.UTF_8));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might consider using versions that write to / read from streams directly ,e.g., https://sites.google.com/site/gson/streaming

(We actually use Jackson in Dataflow core SDK, not sure about pros/cons of Gson).

}

@Override
public ConsumerRecord<String, String> decode(InputStream inStream, Context context)
throws IOException {
String json;
try (InputStreamReader reader = new InputStreamReader(inStream, Charsets.UTF_8)){
json = CharStreams.toString(reader);
}
Gson gson = new Gson();
@SuppressWarnings("unchecked")
ConsumerRecord<String, String> record = gson.fromJson(json, ConsumerRecord.class);
return record;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.google.cloud.dataflow.contrib.kafka.io;

import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.common.collect.Maps;

/**
* A marker representing the progress and state of the {@link UnboundedReader}.
*
* Starts a separate thread to read messages from Kafka to populate a {@link
* LinkedBlockingQueue}. As messages are read, the furthest processed offset is
* tracked. When messages have been durably committed by Dataflow, the furthest
* offset read is written to Kafka.
*
* Will return duplicate messages.
*/
public class KafkaCheckpoint implements UnboundedSource.CheckpointMark {

private static final Logger LOG = LoggerFactory.getLogger(KafkaCheckpoint.class);

private final LinkedBlockingQueue<ConsumerRecord<String, String>> recordQueue;
private final ConcurrentMap<TopicPartition, OffsetAndMetadata> furthestCommitted;
private Map<TopicPartition, OffsetAndMetadata> furthestRead;
private ConsumerRecord<String, String> current;
private KafkaProducer producer;

KafkaCheckpoint(String bootstrapServers, String groupId, String topic) {
recordQueue = new LinkedBlockingQueue<ConsumerRecord<String, String>>();
furthestCommitted = Maps.newConcurrentMap();
furthestRead = Maps.newHashMap();

KafkaProducer.Builder builder = new KafkaProducer.Builder();
KafkaProducer producer = builder
.withBootstrapServers(bootstrapServers)
.withGroupId(groupId)
.withTopic(topic)
.withQueue(recordQueue)
.withCommitted(furthestCommitted)
.build();

new Thread(producer).start();
}

/**
* Commit the furthest read offsets to the Kafka cluster.
*/
@Override
public void finalizeCheckpoint() {
furthestCommitted.clear();
furthestCommitted.putAll(furthestRead);

producer.commit();

furthestRead = Maps.newHashMap();
}

/**
* Advance to the next available record. Returns true if a record exists.
* False otherwise.
*/
public boolean advance() {
current = this.recordQueue.poll();
if (current == null) {
return false;
}

TopicPartition p = new TopicPartition(current.topic(), current.partition());
OffsetAndMetadata offset = new OffsetAndMetadata(current.offset() + 1);
furthestRead.put(p, offset);

return true;
}

/**
* Return the current record.
*/
public ConsumerRecord<String, String> current() throws NoSuchElementException {
if (current == null) {
throw new NoSuchElementException();
}

LOG.info("Current Record: topic={}, partition={}, offset={}, value={}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop to debug or lower? wrap in if (LOG.isInfoEnabled()){}? Logging every element will typically destroy pipeline perf.

current.topic(), current.partition(), current.offset(), current.value());

return current;
}

/**
* Shutdown the producer thread.
*/
public void shutdown() {
producer.shutdown();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a really interesting way to design the checkpoint. Are there any guarantees that when the checkpoint is being serialized it will be a "consistent" checkpoint, or will Kafka be able to insert records to the different fields willy-nilly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(aka, is the finalizeCheckpoint() function atomic?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I'm not 100% happy with this design. There is probably a better way to manage the concurrent access to the Kafka thread I just haven't found it yet.

The limitation with this design is that you may get duplicate records if a crash happens before committing. Otherwise it seems to be safe.

Loading