Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
45ef009
add initial template and dependencies
ilya-kozyrev Sep 25, 2020
321c715
Added flex template creation with metadata support and instructions
Sep 27, 2020
e88fd31
added new gradle modules for templates
ilya-kozyrev Oct 2, 2020
ed9be58
moved metadata to template location, reverted examples build.gradle
ilya-kozyrev Oct 2, 2020
e2a8317
Moved KafkaToPubsub to template, implemented options in separate package
ilya-kozyrev Oct 2, 2020
4ab8e58
Added package-info.java to new packages
ilya-kozyrev Oct 2, 2020
aeb6411
Reverted build.gradle to master branch state
ilya-kozyrev Oct 2, 2020
61bf992
fixed JAVADOC and metadata
ilya-kozyrev Oct 2, 2020
015e6e6
Added the Read Me section with a step-by-step guide
Oct 5, 2020
129f466
Update README.md
AKosolapov Oct 5, 2020
6db17ad
Readme fixes regarding comments
Oct 7, 2020
01e4987
Update README.md
AKosolapov Oct 7, 2020
b6decbd
Update README.md
AKosolapov Oct 7, 2020
8e41d13
Fixed typos in README.md
Oct 7, 2020
369a205
refactored README.md added case to run template locally
ilya-kozyrev Oct 7, 2020
01db016
Update README.md
AKosolapov Oct 7, 2020
5821bdf
fix build script for dataflow in README.md
ilya-kozyrev Oct 8, 2020
98de91f
Added unit test and fixed metadata file
Oct 9, 2020
fc1ed9a
Added Licenses and style fixes
Oct 9, 2020
6e159c7
Added support for retrieving Kafka credentials from HashiCorp Vault s…
Oct 12, 2020
523f796
Updated README.md and metadata with parameters for Vault access; refa…
Oct 13, 2020
e78c02c
Style fix
Oct 13, 2020
1055ec2
Added description for Vault parameters in metadata
Oct 14, 2020
77defc1
FIX trailing whitespaces in README.md
ilya-kozyrev Oct 15, 2020
d6ab0f6
FIX. Blank line contains whitespace README.md
ilya-kozyrev Oct 15, 2020
8881ff3
Update README.md
AKosolapov Oct 15, 2020
a821560
Refactored to examples folder
ilya-kozyrev Oct 16, 2020
c22f110
Added conversion from JSON into PubsubMessage and extracted all trans…
Oct 16, 2020
b56ec7b
Whitespacelint fix
Oct 16, 2020
71308e5
Updated README.md and output formats
Oct 21, 2020
3c218df
Update README.md
AKosolapov Oct 22, 2020
8bdfc34
Update README.md
Oct 22, 2020
72c38e0
Merge pull request #2 from akvelon/Readme
Oct 22, 2020
1d94fcd
Added support for SSL and removed outputFormat option
Oct 22, 2020
a194d54
Added avro usage example
ilya-kozyrev Oct 22, 2020
12c553f
Merge branch 'KafkaToPubsubTemplate' of github.com:akvelon/beam into …
ilya-kozyrev Oct 22, 2020
f754aa8
Added ssl to AVRO reader
ilya-kozyrev Oct 22, 2020
695467a
FIX whitespaces.
ilya-kozyrev Oct 22, 2020
0212b90
added readme/docs regarding of Avro
ilya-kozyrev Oct 23, 2020
0d0c824
README.md and javadoc fixes
Oct 26, 2020
a4fea29
Added Vault's response JSON schema description
Oct 26, 2020
17957b8
Style fix
Oct 28, 2020
a22c696
Merge branch 'master' into KafkaToPubsubTemplate
ilya-kozyrev Oct 28, 2020
b4e7081
Refactoring.
ilya-kozyrev Nov 19, 2020
aa857d7
Fixed ssl parameters
ilya-kozyrev Nov 19, 2020
25efe95
Fixed style
ilya-kozyrev Nov 19, 2020
7fd7ea6
Merge branch 'master' into KafkaToPubsubTemplate
ilya-kozyrev Nov 20, 2020
f300de5
optimize build.gradle
ilya-kozyrev Nov 20, 2020
c0e6ad0
Resolve conversations
ilya-kozyrev Nov 20, 2020
998f4c0
Updated regarding comments and added unit tests
Nov 23, 2020
6a6aa46
README.md update
Nov 23, 2020
0858e47
made Avro class more abstract
ilya-kozyrev Nov 23, 2020
8a3d85d
fix style
ilya-kozyrev Nov 23, 2020
4172c96
fixed review conversation items
ilya-kozyrev Nov 26, 2020
3c63298
fix getting ssl credentials from Vault
ilya-kozyrev Nov 30, 2020
477df3e
FIX add empty && null map validation to sslConfig
ilya-kozyrev Nov 30, 2020
83ff7b7
FIX. remove vault ssl certs parameters
ilya-kozyrev Nov 30, 2020
b1ba8e1
metadata fix
Nov 30, 2020
08bc3fe
Local paths fix for SSL from GCS
Nov 30, 2020
9fb43b6
add new log message to avoid wrong local files usage
ilya-kozyrev Dec 1, 2020
e99a7d3
fix style
ilya-kozyrev Dec 1, 2020
2ae7d02
Moved kafka-to-pubsub to examples/ directory and updated README.md (#6)
ramazan-yapparov Dec 4, 2020
3e558a3
Stylefix
Dec 4, 2020
51ddead
Removed unused file
ramazan-yapparov Dec 7, 2020
e8ca92d
add tbd section for e-2-e tests
ilya-kozyrev Dec 7, 2020
880f7e6
fix styles
ilya-kozyrev Dec 7, 2020
0eba7ba
specifying kafka-clients version
ilya-kozyrev Dec 7, 2020
5919f9b
fix readme
ilya-kozyrev Dec 7, 2020
4372d6f
template -> exmples
ilya-kozyrev Dec 7, 2020
dd85c93
Update examples/kafka-to-pubsub/README.md
ilya-kozyrev Dec 8, 2020
89adf0b
Merge branch 'master' into KafkaToPubsubTemplate
ramazan-yapparov Dec 9, 2020
51182eb
Fixed outdated import
ramazan-yapparov Dec 9, 2020
a5406a5
Moved template to examples/complete
ramazan-yapparov Dec 9, 2020
e7fc00e
Updated paths in readme file
ramazan-yapparov Dec 9, 2020
ac76e73
Updated README.md and javadoc regarding comments
Dec 9, 2020
b0c4428
README.md stylefix
Dec 9, 2020
3931fba
Added link to KafkaToPubsub example into complete/README.md
Dec 9, 2020
21d8b36
Stylefix
Dec 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion examples/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

import groovy.json.JsonOutput

plugins { id 'org.apache.beam.module' }
plugins {
id 'java'
id 'org.apache.beam.module'
id 'com.github.johnrengelman.shadow'
}

applyJavaNature(
exportJavadoc: false,
automaticModuleName: 'org.apache.beam.examples',
Expand Down Expand Up @@ -49,9 +54,11 @@ configurations.sparkRunnerPreCommit {
dependencies {
compile enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
compile library.java.vendored_guava_26_0_jre
compile library.java.kafka_clients
compile project(path: ":sdks:java:core", configuration: "shadow")
compile project(":sdks:java:extensions:google-cloud-platform-core")
compile project(":sdks:java:io:google-cloud-platform")
compile project(":sdks:java:io:kafka")
compile project(":sdks:java:extensions:ml")
compile library.java.avro
compile library.java.bigdataoss_util
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ This directory contains end-to-end example pipelines that perform complex data p
Pub/Sub topic, splits each line into individual words, capitalizes those
words, and writes the output to a BigQuery table.
</li>
<li><a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/KafkaToPubsub.java">KafkaToPubsub</a>
&mdash; A streaming pipeline example that creates a pipeline to read data
from a single or multiple topics from Apache Kafka and write data into a single topic
in Google Cloud Pub/Sub.
</li>
<li><a href="https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java">TfIdf</a>
&mdash; An example that computes a basic TF-IDF search table for a directory or
Cloud Storage prefix. Demonstrates joining data, side inputs, and logging.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.complete.kafkatopubsub;

/** Constant variables that are used across the template's parts. */
public class KafkaPubsubConstants {

/* Config keywords */
public static final String KAFKA_CREDENTIALS = "kafka";
public static final String SSL_CREDENTIALS = "ssl";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String BUCKET = "bucket";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.complete.kafkatopubsub;

import static org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer.Utils.configureKafka;
import static org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer.Utils.configureSsl;
import static org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer.Utils.getKafkaCredentialsFromVault;
import static org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer.Utils.isSslSpecified;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.examples.complete.kafkatopubsub.avro.AvroDataClass;
import org.apache.beam.examples.complete.kafkatopubsub.avro.AvroDataClassKafkaAvroDeserializer;
import org.apache.beam.examples.complete.kafkatopubsub.options.KafkaToPubsubOptions;
import org.apache.beam.examples.complete.kafkatopubsub.transforms.FormatTransform;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The {@link KafkaToPubsub} pipeline is a streaming pipeline which ingests data in JSON format from
* Kafka, and outputs the resulting records to PubSub. Input topics, output topic, Bootstrap servers
* are specified by the user as template parameters. <br>
* Kafka may be configured with SASL/SCRAM security mechanism, in this case a Vault secret storage
* with credentials should be provided. URL to credentials and Vault token are specified by the user
* as template parameters.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>Kafka Bootstrap Server(s).
* <li>Kafka Topic(s) exists.
* <li>The PubSub output topic exists.
* <li>(Optional) An existing HashiCorp Vault secret storage
* <li>(Optional) A configured secure SSL connection for Kafka
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Gradle preparation
*
* To run this example your {@code build.gradle} file should contain the following task
* to execute the pipeline:
* {@code
* task execute (type:JavaExec) {
* main = System.getProperty("mainClass")
* classpath = sourceSets.main.runtimeClasspath
* systemProperties System.getProperties()
* args System.getProperty("exec.args", "").split()
* }
* }
*
* This task allows to run the pipeline via the following command:
* {@code
* gradle clean execute -DmainClass=org.apache.beam.examples.complete.kafkatopubsub.KafkaToPubsub \
* -Dexec.args="--<argument>=<value> --<argument>=<value>"
* }
*
* # Running the pipeline
* To execute this pipeline, specify the parameters:
*
* - Kafka Bootstrap servers
* - Kafka input topics
* - Pub/Sub output topic
* - Output format
*
* in the following format:
* {@code
* --bootstrapServers=host:port \
* --inputTopics=your-input-topic \
* --outputTopic=projects/your-project-id/topics/your-topic-pame \
* --outputFormat=AVRO|PUBSUB
* }
*
* Optionally, to retrieve Kafka credentials for SASL/SCRAM,
* specify a URL to the credentials in HashiCorp Vault and the vault access token:
* {@code
* --secretStoreUrl=http(s)://host:port/path/to/credentials
* --vaultToken=your-token
* }
*
* Optionally, to configure secure SSL connection between the Beam pipeline and Kafka,
* specify the parameters:
* - A path to a truststore file (it can be a local path or a GCS path, which should start with `gs://`)
* - A path to a keystore file (it can be a local path or a GCS path, which should start with `gs://`)
* - Truststore password
* - Keystore password
* - Key password
* {@code
* --truststorePath=path/to/kafka.truststore.jks
* --keystorePath=path/to/kafka.keystore.jks
* --truststorePassword=your-truststore-password
* --keystorePassword=your-keystore-password
* --keyPassword=your-key-password
* }
* By default this will run the pipeline locally with the DirectRunner. To change the runner, specify:
* {@code
* --runner=YOUR_SELECTED_RUNNER
* }
* </pre>
*
* <p><b>Example Avro usage</b>
*
* <pre>
* This template contains an example Class to deserialize AVRO from Kafka and serialize it to AVRO in Pub/Sub.
*
* To use this example in the specific case, follow the few steps:
* <ul>
* <li> Create your own class to describe AVRO schema. As an example use {@link AvroDataClass}. Just define necessary fields.
* <li> Create your own Avro Deserializer class. As an example use {@link AvroDataClassKafkaAvroDeserializer}. Just rename it, and put your own Schema class as the necessary types.
* <li> Modify the {@link FormatTransform}. Put your Schema class and Deserializer to the related parameter.
* <li> Modify write step in the {@link KafkaToPubsub} by put your Schema class to "writeAvrosToPubSub" step.
* </ul>
* </pre>
*/
public class KafkaToPubsub {

/* Logger for class.*/
private static final Logger LOG = LoggerFactory.getLogger(KafkaToPubsub.class);

/**
* Main entry point for pipeline execution.
*
* @param args Command line arguments to the pipeline.
*/
public static void main(String[] args) {
KafkaToPubsubOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaToPubsubOptions.class);

run(options);
}

/**
* Runs a pipeline which reads message from Kafka and writes it to GCS.
*
* @param options arguments to the pipeline
*/
public static PipelineResult run(KafkaToPubsubOptions options) {
// Configure Kafka consumer properties
Map<String, Object> kafkaConfig = new HashMap<>();
Map<String, String> sslConfig = new HashMap<>();
if (options.getSecretStoreUrl() != null && options.getVaultToken() != null) {
Map<String, Map<String, String>> credentials =
getKafkaCredentialsFromVault(options.getSecretStoreUrl(), options.getVaultToken());
kafkaConfig = configureKafka(credentials.get(KafkaPubsubConstants.KAFKA_CREDENTIALS));
} else {
LOG.warn(
"No information to retrieve Kafka credentials was provided. "
+ "Trying to initiate an unauthorized connection.");
}

if (isSslSpecified(options)) {
sslConfig.putAll(configureSsl(options));
} else {
LOG.info(
"No information to retrieve SSL certificate was provided by parameters."
+ "Trying to initiate a plain text connection.");
}

List<String> topicsList = new ArrayList<>(Arrays.asList(options.getInputTopics().split(",")));

checkArgument(
topicsList.size() > 0 && topicsList.get(0).length() > 0,
"inputTopics cannot be an empty string.");

List<String> bootstrapServersList =
new ArrayList<>(Arrays.asList(options.getBootstrapServers().split(",")));

checkArgument(
bootstrapServersList.size() > 0 && topicsList.get(0).length() > 0,
"bootstrapServers cannot be an empty string.");

// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
LOG.info(
"Starting Kafka-To-PubSub pipeline with parameters bootstrap servers:"
+ options.getBootstrapServers()
+ " input topics: "
+ options.getInputTopics()
+ " output pubsub topic: "
+ options.getOutputTopic());

/*
* Steps:
* 1) Read messages in from Kafka
* 2) Extract values only
* 3) Write successful records to PubSub
*/

if (options.getOutputFormat() == FormatTransform.FORMAT.AVRO) {
pipeline
.apply(
"readAvrosFromKafka",
FormatTransform.readAvrosFromKafka(
options.getBootstrapServers(), topicsList, kafkaConfig, sslConfig))
.apply("createValues", Values.create())
.apply("writeAvrosToPubSub", PubsubIO.writeAvros(AvroDataClass.class));

} else {
pipeline
.apply(
"readFromKafka",
FormatTransform.readFromKafka(
options.getBootstrapServers(), topicsList, kafkaConfig, sslConfig))
.apply("createValues", Values.create())
.apply("writeToPubSub", new FormatTransform.FormatOutput(options));
}

return pipeline.run();
}
}
Loading