diff --git a/examples/java/build.gradle b/examples/java/build.gradle index c305f0451f09..a1b6827a98ec 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -18,7 +18,12 @@ import groovy.json.JsonOutput -plugins { id 'org.apache.beam.module' } +plugins { + id 'java' + id 'org.apache.beam.module' + id 'com.github.johnrengelman.shadow' +} + applyJavaNature( exportJavadoc: false, automaticModuleName: 'org.apache.beam.examples', @@ -49,9 +54,11 @@ configurations.sparkRunnerPreCommit { dependencies { compile enforcedPlatform(library.java.google_cloud_platform_libraries_bom) compile library.java.vendored_guava_26_0_jre + compile library.java.kafka_clients compile project(path: ":sdks:java:core", configuration: "shadow") compile project(":sdks:java:extensions:google-cloud-platform-core") compile project(":sdks:java:io:google-cloud-platform") + compile project(":sdks:java:io:kafka") compile project(":sdks:java:extensions:ml") compile library.java.avro compile library.java.bigdataoss_util diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/README.md b/examples/java/src/main/java/org/apache/beam/examples/complete/README.md index 3f4842a5c7a7..b74ea44554d5 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/README.md +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/README.md @@ -32,6 +32,11 @@ This directory contains end-to-end example pipelines that perform complex data p Pub/Sub topic, splits each line into individual words, capitalizes those words, and writes the output to a BigQuery table. +
  • KafkaToPubsub + — A streaming pipeline example that creates a pipeline to read data + from a single or multiple topics from Apache Kafka and write data into a single topic + in Google Cloud Pub/Sub. +
  • TfIdf — An example that computes a basic TF-IDF search table for a directory or Cloud Storage prefix. Demonstrates joining data, side inputs, and logging. diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/KafkaPubsubConstants.java b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/KafkaPubsubConstants.java new file mode 100644 index 000000000000..46f021a5ca75 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/KafkaPubsubConstants.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.kafkatopubsub; + +/** Constant variables that are used across the template's parts. */ +public class KafkaPubsubConstants { + + /* Config keywords */ + public static final String KAFKA_CREDENTIALS = "kafka"; + public static final String SSL_CREDENTIALS = "ssl"; + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + public static final String BUCKET = "bucket"; +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/KafkaToPubsub.java b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/KafkaToPubsub.java new file mode 100644 index 000000000000..8b4e2b305927 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/KafkaToPubsub.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.kafkatopubsub; + +import static org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer.Utils.configureKafka; +import static org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer.Utils.configureSsl; +import static org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer.Utils.getKafkaCredentialsFromVault; +import static org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer.Utils.isSslSpecified; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.examples.complete.kafkatopubsub.avro.AvroDataClass; +import org.apache.beam.examples.complete.kafkatopubsub.avro.AvroDataClassKafkaAvroDeserializer; +import org.apache.beam.examples.complete.kafkatopubsub.options.KafkaToPubsubOptions; +import org.apache.beam.examples.complete.kafkatopubsub.transforms.FormatTransform; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@link KafkaToPubsub} pipeline is a streaming pipeline which ingests data in JSON format from + * Kafka, and outputs the resulting records to PubSub. Input topics, output topic, Bootstrap servers + * are specified by the user as template parameters.
    + * Kafka may be configured with SASL/SCRAM security mechanism, in this case a Vault secret storage + * with credentials should be provided. URL to credentials and Vault token are specified by the user + * as template parameters. + * + *

    Pipeline Requirements + * + *

    + * + *

    Example Usage + * + *

    + * # Gradle preparation
    + *
    + * To run this example your {@code build.gradle} file should contain the following task
    + * to execute the pipeline:
    + * {@code
    + * task execute (type:JavaExec) {
    + *     main = System.getProperty("mainClass")
    + *     classpath = sourceSets.main.runtimeClasspath
    + *     systemProperties System.getProperties()
    + *     args System.getProperty("exec.args", "").split()
    + * }
    + * }
    + *
    + * This task allows to run the pipeline via the following command:
    + * {@code
    + * gradle clean execute -DmainClass=org.apache.beam.examples.complete.kafkatopubsub.KafkaToPubsub \
    + *      -Dexec.args="--= --="
    + * }
    + *
    + * # Running the pipeline
    + * To execute this pipeline, specify the parameters:
    + *
    + * - Kafka Bootstrap servers
    + * - Kafka input topics
    + * - Pub/Sub output topic
    + * - Output format
    + *
    + * in the following format:
    + * {@code
    + * --bootstrapServers=host:port \
    + * --inputTopics=your-input-topic \
    + * --outputTopic=projects/your-project-id/topics/your-topic-pame \
    + * --outputFormat=AVRO|PUBSUB
    + * }
    + *
    + * Optionally, to retrieve Kafka credentials for SASL/SCRAM,
    + * specify a URL to the credentials in HashiCorp Vault and the vault access token:
    + * {@code
    + * --secretStoreUrl=http(s)://host:port/path/to/credentials
    + * --vaultToken=your-token
    + * }
    + *
    + * Optionally, to configure secure SSL connection between the Beam pipeline and Kafka,
    + * specify the parameters:
    + * - A path to a truststore file (it can be a local path or a GCS path, which should start with `gs://`)
    + * - A path to a keystore file (it can be a local path or a GCS path, which should start with `gs://`)
    + * - Truststore password
    + * - Keystore password
    + * - Key password
    + * {@code
    + * --truststorePath=path/to/kafka.truststore.jks
    + * --keystorePath=path/to/kafka.keystore.jks
    + * --truststorePassword=your-truststore-password
    + * --keystorePassword=your-keystore-password
    + * --keyPassword=your-key-password
    + * }
    + * By default this will run the pipeline locally with the DirectRunner. To change the runner, specify:
    + * {@code
    + * --runner=YOUR_SELECTED_RUNNER
    + * }
    + * 
    + * + *

    Example Avro usage + * + *

    + * This template contains an example Class to deserialize AVRO from Kafka and serialize it to AVRO in Pub/Sub.
    + *
    + * To use this example in the specific case, follow the few steps:
    + * 
    + * 
    + */ +public class KafkaToPubsub { + + /* Logger for class.*/ + private static final Logger LOG = LoggerFactory.getLogger(KafkaToPubsub.class); + + /** + * Main entry point for pipeline execution. + * + * @param args Command line arguments to the pipeline. + */ + public static void main(String[] args) { + KafkaToPubsubOptions options = + PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaToPubsubOptions.class); + + run(options); + } + + /** + * Runs a pipeline which reads message from Kafka and writes it to GCS. + * + * @param options arguments to the pipeline + */ + public static PipelineResult run(KafkaToPubsubOptions options) { + // Configure Kafka consumer properties + Map kafkaConfig = new HashMap<>(); + Map sslConfig = new HashMap<>(); + if (options.getSecretStoreUrl() != null && options.getVaultToken() != null) { + Map> credentials = + getKafkaCredentialsFromVault(options.getSecretStoreUrl(), options.getVaultToken()); + kafkaConfig = configureKafka(credentials.get(KafkaPubsubConstants.KAFKA_CREDENTIALS)); + } else { + LOG.warn( + "No information to retrieve Kafka credentials was provided. " + + "Trying to initiate an unauthorized connection."); + } + + if (isSslSpecified(options)) { + sslConfig.putAll(configureSsl(options)); + } else { + LOG.info( + "No information to retrieve SSL certificate was provided by parameters." + + "Trying to initiate a plain text connection."); + } + + List topicsList = new ArrayList<>(Arrays.asList(options.getInputTopics().split(","))); + + checkArgument( + topicsList.size() > 0 && topicsList.get(0).length() > 0, + "inputTopics cannot be an empty string."); + + List bootstrapServersList = + new ArrayList<>(Arrays.asList(options.getBootstrapServers().split(","))); + + checkArgument( + bootstrapServersList.size() > 0 && topicsList.get(0).length() > 0, + "bootstrapServers cannot be an empty string."); + + // Create the pipeline + Pipeline pipeline = Pipeline.create(options); + LOG.info( + "Starting Kafka-To-PubSub pipeline with parameters bootstrap servers:" + + options.getBootstrapServers() + + " input topics: " + + options.getInputTopics() + + " output pubsub topic: " + + options.getOutputTopic()); + + /* + * Steps: + * 1) Read messages in from Kafka + * 2) Extract values only + * 3) Write successful records to PubSub + */ + + if (options.getOutputFormat() == FormatTransform.FORMAT.AVRO) { + pipeline + .apply( + "readAvrosFromKafka", + FormatTransform.readAvrosFromKafka( + options.getBootstrapServers(), topicsList, kafkaConfig, sslConfig)) + .apply("createValues", Values.create()) + .apply("writeAvrosToPubSub", PubsubIO.writeAvros(AvroDataClass.class)); + + } else { + pipeline + .apply( + "readFromKafka", + FormatTransform.readFromKafka( + options.getBootstrapServers(), topicsList, kafkaConfig, sslConfig)) + .apply("createValues", Values.create()) + .apply("writeToPubSub", new FormatTransform.FormatOutput(options)); + } + + return pipeline.run(); + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/README.md b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/README.md new file mode 100644 index 000000000000..13f2d3a5f593 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/README.md @@ -0,0 +1,200 @@ + + +# Apache Beam pipeline example to ingest data from Apache Kafka to Google Cloud Pub/Sub + +This directory contains an [Apache Beam](https://beam.apache.org/) pipeline example that creates a pipeline to read data +from a single or multiple topics from +[Apache Kafka](https://kafka.apache.org/) and write data into a single topic +in [Google Cloud Pub/Sub](https://cloud.google.com/pubsub). + +Supported data formats: + +- Serializable plaintext formats, such as JSON +- [PubSubMessage](https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage). + +Supported input source configurations: + +- Single or multiple Apache Kafka bootstrap servers +- Apache Kafka SASL/SCRAM authentication over plaintext or SSL connection +- Secrets vault service [HashiCorp Vault](https://www.vaultproject.io/). + +Supported destination configuration: + +- Single Google Cloud Pub/Sub topic. + +In a simple scenario, the example will create an Apache Beam pipeline that will read messages from a source Kafka server +with a source topic, and stream the text messages into specified Pub/Sub destination topic. Other scenarios may need +Kafka SASL/SCRAM authentication, that can be performed over plain text or SSL encrypted connection. The example supports +using a single Kafka user account to authenticate in the provided source Kafka servers and topics. To support SASL +authenticaton over SSL the example will need an SSL certificate location and access to a secrets vault service with +Kafka username and password, currently supporting HashiCorp Vault. + +## Requirements + +- Java 8 +- Kafka Bootstrap Server(s) up and running +- Existing source Kafka topic(s) +- An existing Pub/Sub destination output topic +- (Optional) An existing HashiCorp Vault +- (Optional) A configured secure SSL connection for Kafka + +## Getting Started + +This section describes what is needed to get the example up and running. + +- Gradle preparation +- Local execution +- Running as a Dataflow Template +- Supported Output Formats + - PubSubMessage + - AVRO +- E2E tests (TBD) + +## Gradle preparation + +To run this example your `build.gradle` file should contain the following task to execute the pipeline: + +``` +task execute (type:JavaExec) { + main = System.getProperty("mainClass") + classpath = sourceSets.main.runtimeClasspath + systemProperties System.getProperties() + args System.getProperty("exec.args", "").split() +} +``` + +This task allows to run the pipeline via the following command: + +```bash +gradle clean execute -DmainClass=org.apache.beam.examples.complete.kafkatopubsub.KafkaToPubsub \ + -Dexec.args="--= --=" +``` + +## Running the pipeline + +To execute this pipeline, specify the parameters: + +- Kafka Bootstrap servers +- Kafka input topics +- Pub/Sub output topic +- Output format + +in the following format: + +```bash +--bootstrapServers=host:port \ +--inputTopics=your-input-topic \ +--outputTopic=projects/your-project-id/topics/your-topic-pame \ +--outputFormat=AVRO|PUBSUB +``` + +Optionally, to retrieve Kafka credentials for SASL/SCRAM, specify a URL to the credentials in HashiCorp Vault and the +vault access token: + +```bash +--secretStoreUrl=http(s)://host:port/path/to/credentials +--vaultToken=your-token +``` + +Optionally, to configure secure SSL connection between the Beam pipeline and Kafka, specify the parameters: + +- A path to a truststore file (it can be a local path or a GCS path, which should start with `gs://`) +- A path to a keystore file (it can be a local path or a GCS path, which should start with `gs://`) +- Truststore password +- Keystore password +- Key password + +```bash +--truststorePath=path/to/kafka.truststore.jks +--keystorePath=path/to/kafka.keystore.jks +--truststorePassword=your-truststore-password +--keystorePassword=your-keystore-password +--keyPassword=your-key-password +``` + +By default this will run the pipeline locally with the DirectRunner. To change the runner, specify: + +```bash +--runner=YOUR_SELECTED_RUNNER +``` + +See the [documentation](http://beam.apache.org/get-started/quickstart/) and +the [Examples README](../../../../../../../../../README.md) for more information about how to run this example. + +## Running as a Dataflow Template + +This example also exists as Google Dataflow Template, which you can build and run using Google Cloud Platform. See +its [README.md](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/v2/kafka-to-pubsub/README.md) for +more information. + +## Supported Output Formats + +This pipeline can output data in a format of PubSubMessage or AVRO. + +### PubSubMessage + +This example supports PubSubMessage format for output out-of-the-box. No additional changes are required. + +### AVRO + +This example contains an example demonstrating AVRO format support, the following steps should be done to provide it: + +- Define custom Class to deserialize AVRO from Kafka [provided in example] +- Create custom data serialization in Apache Beam +- Serialize data to AVRO in Pub/Sub [provided in example]. + +To use this example in the specific case, follow these steps: + +- Create your own class to describe AVRO schema. As an example use [AvroDataClass](avro/AvroDataClass.java). Just define + necessary fields. +- Create your own Avro Deserializer class. As an example + use [AvroDataClassKafkaAvroDeserializer class](avro/AvroDataClassKafkaAvroDeserializer.java). Just rename it, and put + your own Schema class as the necessary types. +- Modify the [FormatTransform.readAvrosFromKafka method](transforms/FormatTransform.java). Put your Schema class and + Deserializer to the related parameter. + +```java +return KafkaIO.read() + ... + .withValueDeserializerAndCoder( + AvroDataClassKafkaAvroDeserializer.class,AvroCoder.of(AvroDataClass.class)) // put your classes here + ... +``` + +- [OPTIONAL TO IMPLEMENT] Add [Beam Transform](https://beam.apache.org/documentation/programming-guide/#transforms) if + it necessary in your case. +- Modify the write step in the [KafkaToPubsub class](KafkaToPubsub.java) by putting your Schema class to " + writeAvrosToPubSub" step. + - NOTE: if it changed during the transform, you should use changed one class definition. + +```java +if(options.getOutputFormat()==FormatTransform.FORMAT.AVRO){ + ... + .apply("writeAvrosToPubSub",PubsubIO.writeAvros(AvroDataClass.class)); // put your SCHEMA class here + + } +``` + +## End to end tests + +TBD + +_Note: The Kafka to Pub/Sub job executed with a distributed runner supports SSL configuration with the certificate +located only in GCS._ diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/avro/AvroDataClass.java b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/avro/AvroDataClass.java new file mode 100644 index 000000000000..8c8702115f65 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/avro/AvroDataClass.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.kafkatopubsub.avro; + +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; + +/** + * Example of AVRO serialization class. To configure your AVRO schema, change this class to + * requirement schema definition + */ +@DefaultCoder(AvroCoder.class) +public class AvroDataClass { + + String field1; + Float field2; + Float field3; + + public AvroDataClass(String field1, Float field2, Float field3) { + this.field1 = field1; + this.field2 = field2; + this.field3 = field3; + } + + public String getField1() { + return field1; + } + + public void setField1(String field1) { + this.field1 = field1; + } + + public Float getField2() { + return field2; + } + + public void setField2(Float field2) { + this.field2 = field2; + } + + public Float getField3() { + return field3; + } + + public void setField3(Float field3) { + this.field3 = field3; + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/avro/AvroDataClassKafkaAvroDeserializer.java b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/avro/AvroDataClassKafkaAvroDeserializer.java new file mode 100644 index 000000000000..a9aeb72eb196 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/avro/AvroDataClassKafkaAvroDeserializer.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.kafkatopubsub.avro; + +import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; + +/** Example of custom AVRO Deserialize. */ +public class AvroDataClassKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer + implements Deserializer { + + @Override + public void configure(Map configs, boolean isKey) { + configure(new KafkaAvroDeserializerConfig(configs)); + } + + @Override + public AvroDataClass deserialize(String s, byte[] bytes) { + return (AvroDataClass) this.deserialize(bytes); + } + + @Override + public void close() {} +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/avro/package-info.java b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/avro/package-info.java new file mode 100644 index 000000000000..1dc4f36ce656 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/avro/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Kafka to Pubsub template. */ +package org.apache.beam.examples.complete.kafkatopubsub.avro; diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/kafka/consumer/SslConsumerFactoryFn.java b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/kafka/consumer/SslConsumerFactoryFn.java new file mode 100644 index 000000000000..012f147b77fd --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/kafka/consumer/SslConsumerFactoryFn.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer; + +import java.io.File; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.ReadableByteChannel; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Class to create Kafka Consumer with configured SSL. */ +public class SslConsumerFactoryFn + implements SerializableFunction, Consumer> { + private final Map sslConfig; + private static final String TRUSTSTORE_LOCAL_PATH = "/tmp/kafka.truststore.jks"; + private static final String KEYSTORE_LOCAL_PATH = "/tmp/kafka.keystore.jks"; + + /* Logger for class.*/ + private static final Logger LOG = LoggerFactory.getLogger(SslConsumerFactoryFn.class); + + public SslConsumerFactoryFn(Map sslConfig) { + this.sslConfig = sslConfig; + } + + @SuppressWarnings("nullness") + @Override + public Consumer apply(Map config) { + try { + String truststoreLocation = sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG); + if (truststoreLocation.startsWith("gs://")) { + getGcsFileAsLocal(truststoreLocation, TRUSTSTORE_LOCAL_PATH); + sslConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, TRUSTSTORE_LOCAL_PATH); + } else { + checkFileExists(truststoreLocation); + } + + String keystoreLocation = sslConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); + if (keystoreLocation.startsWith("gs://")) { + getGcsFileAsLocal(keystoreLocation, KEYSTORE_LOCAL_PATH); + sslConfig.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KEYSTORE_LOCAL_PATH); + } else { + checkFileExists(keystoreLocation); + } + } catch (IOException e) { + LOG.error("Failed to retrieve data for SSL", e); + return new KafkaConsumer<>(config); + } + + config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name()); + config.put( + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); + config.put( + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, + sslConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)); + config.put( + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); + config.put( + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, + sslConfig.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)); + config.put( + SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslConfig.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)); + + return new KafkaConsumer<>(config); + } + + private void checkFileExists(String filePath) throws IOException { + LOG.info( + "Trying to get file: {} locally. Local files don't support when in using distribute runner", + filePath); + File f = new File(filePath); + if (f.exists()) { + LOG.debug("{} exists", f.getAbsolutePath()); + } else { + LOG.error("{} does not exist", f.getAbsolutePath()); + throw new IOException(); + } + } + + /** + * Reads a file from GCS and writes it locally. + * + * @param gcsFilePath path to file in GCS in format "gs://your-bucket/path/to/file" + * @param outputFilePath path where to save file locally + * @throws IOException thrown if not able to read or write file + */ + public static void getGcsFileAsLocal(String gcsFilePath, String outputFilePath) + throws IOException { + LOG.info("Reading contents from GCS file: {}", gcsFilePath); + Set options = new HashSet<>(2); + options.add(StandardOpenOption.CREATE); + options.add(StandardOpenOption.APPEND); + // Copy the GCS file into a local file and will throw an I/O exception in case file not found. + try (ReadableByteChannel readerChannel = + FileSystems.open(FileSystems.matchSingleFileSpec(gcsFilePath).resourceId())) { + try (FileChannel writeChannel = FileChannel.open(Paths.get(outputFilePath), options)) { + writeChannel.transferFrom(readerChannel, 0, Long.MAX_VALUE); + } + } + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/kafka/consumer/Utils.java b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/kafka/consumer/Utils.java new file mode 100644 index 000000000000..ff4cf5f3dcd8 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/kafka/consumer/Utils.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer; + +import static org.apache.beam.examples.complete.kafkatopubsub.KafkaPubsubConstants.KAFKA_CREDENTIALS; +import static org.apache.beam.examples.complete.kafkatopubsub.KafkaPubsubConstants.PASSWORD; +import static org.apache.beam.examples.complete.kafkatopubsub.KafkaPubsubConstants.USERNAME; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.examples.complete.kafkatopubsub.options.KafkaToPubsubOptions; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.JsonObject; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.JsonParser; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for construction of Kafka Consumer. */ +public class Utils { + + /* Logger for class.*/ + private static final Logger LOG = LoggerFactory.getLogger(Utils.class); + + /** + * Retrieves all credentials from HashiCorp Vault secret storage. + * + * @param secretStoreUrl url to the secret storage that contains a credentials for Kafka + * @param token Vault token to access the secret storage + * @return credentials for Kafka consumer config + */ + public static Map> getKafkaCredentialsFromVault( + String secretStoreUrl, String token) { + Map> credentialMap = new HashMap<>(); + + JsonObject credentials = null; + try { + HttpClient client = HttpClientBuilder.create().build(); + HttpGet request = new HttpGet(secretStoreUrl); + request.addHeader("X-Vault-Token", token); + HttpResponse response = client.execute(request); + String json = EntityUtils.toString(response.getEntity(), "UTF-8"); + + /* + Vault's response JSON has a specific schema, where the actual data is placed under + {data: {data: }}. + Example: + { + "request_id": "6a0bb14b-ef24-256c-3edf-cfd52ad1d60d", + "lease_id": "", + "renewable": false, + "lease_duration": 0, + "data": { + "data": { + "bucket": "kafka_to_pubsub_test", + "key_password": "secret", + "keystore_password": "secret", + "keystore_path": "ssl_cert/kafka.keystore.jks", + "password": "admin-secret", + "truststore_password": "secret", + "truststore_path": "ssl_cert/kafka.truststore.jks", + "username": "admin" + }, + "metadata": { + "created_time": "2020-10-20T11:43:11.109186969Z", + "deletion_time": "", + "destroyed": false, + "version": 8 + } + }, + "wrap_info": null, + "warnings": null, + "auth": null + } + */ + // Parse security properties from the response JSON + credentials = + JsonParser.parseString(json) + .getAsJsonObject() + .get("data") + .getAsJsonObject() + .getAsJsonObject("data"); + } catch (IOException e) { + LOG.error("Failed to retrieve credentials from Vault.", e); + } + + if (credentials != null) { + // Username and password for Kafka authorization + credentialMap.put(KAFKA_CREDENTIALS, new HashMap<>()); + + if (credentials.has(USERNAME) && credentials.has(PASSWORD)) { + credentialMap.get(KAFKA_CREDENTIALS).put(USERNAME, credentials.get(USERNAME).getAsString()); + credentialMap.get(KAFKA_CREDENTIALS).put(PASSWORD, credentials.get(PASSWORD).getAsString()); + } else { + LOG.warn( + "There are no username and/or password for Kafka in Vault." + + "Trying to initiate an unauthorized connection."); + } + } + + return credentialMap; + } + + /** + * Configures Kafka consumer for authorized connection. + * + * @param props username and password for Kafka + * @return configuration set of parameters for Kafka + */ + public static Map configureKafka(@Nullable Map props) { + // Create the configuration for Kafka + Map config = new HashMap<>(); + if (props != null && props.containsKey(USERNAME) && props.containsKey(PASSWORD)) { + config.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName()); + config.put( + SaslConfigs.SASL_JAAS_CONFIG, + String.format( + "org.apache.kafka.common.security.scram.ScramLoginModule required " + + "username=\"%s\" password=\"%s\";", + props.get(USERNAME), props.get(PASSWORD))); + } + return config; + } + + public static Map configureSsl(KafkaToPubsubOptions options) { + Map config = new HashMap<>(); + config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, options.getTruststorePath()); + config.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, options.getKeystorePath()); + config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, options.getTruststorePassword()); + config.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, options.getKeystorePassword()); + config.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, options.getKeyPassword()); + + return config; + } + + public static boolean isSslSpecified(KafkaToPubsubOptions options) { + return options.getTruststorePath() != null + || options.getTruststorePassword() != null + || options.getKeystorePath() != null + || options.getKeyPassword() != null; + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/kafka/consumer/package-info.java b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/kafka/consumer/package-info.java new file mode 100644 index 000000000000..54aff5e766e7 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/kafka/consumer/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Kafka to Pubsub template. */ +package org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer; diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/options/KafkaToPubsubOptions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/options/KafkaToPubsubOptions.java new file mode 100644 index 000000000000..7e0460571ef1 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/options/KafkaToPubsubOptions.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.kafkatopubsub.options; + +import org.apache.beam.examples.complete.kafkatopubsub.transforms.FormatTransform; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.Validation; + +public interface KafkaToPubsubOptions extends PipelineOptions { + @Description( + "Comma Separated list of Kafka Bootstrap Servers (e.g: server1:[port],server2:[port]).") + @Validation.Required + String getBootstrapServers(); + + void setBootstrapServers(String value); + + @Description( + "Comma Separated list of Kafka topic(s) to read the input from (e.g: topic1,topic2).") + @Validation.Required + String getInputTopics(); + + void setInputTopics(String value); + + @Description( + "The Cloud Pub/Sub topic to publish to. " + + "The name should be in the format of " + + "projects//topics/.") + @Validation.Required + String getOutputTopic(); + + void setOutputTopic(String outputTopic); + + @Description( + "Format which will be writen to output Pub/Sub topic. Supported formats: AVRO, PUBSUB") + @Validation.Required + FormatTransform.FORMAT getOutputFormat(); + + void setOutputFormat(FormatTransform.FORMAT outputFormat); + + @Description("URL to credentials in Vault") + String getSecretStoreUrl(); + + void setSecretStoreUrl(String secretStoreUrl); + + @Description("Vault token") + String getVaultToken(); + + void setVaultToken(String vaultToken); + + @Description("The path to the trust store file") + String getTruststorePath(); + + void setTruststorePath(String truststorePath); + + @Description("The path to the key store file") + String getKeystorePath(); + + void setKeystorePath(String keystorePath); + + @Description("The password for the trust store password") + String getTruststorePassword(); + + void setTruststorePassword(String truststorePassword); + + @Description("The store password for the key store password") + String getKeystorePassword(); + + void setKeystorePassword(String keystorePassword); + + @Description("The password of the private key in the key store file") + String getKeyPassword(); + + void setKeyPassword(String keyPassword); +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/options/package-info.java b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/options/package-info.java new file mode 100644 index 000000000000..361a5eab058c --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/options/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Kafka to Pubsub template. */ +package org.apache.beam.examples.complete.kafkatopubsub.options; diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/package-info.java b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/package-info.java new file mode 100644 index 000000000000..66b4c04f763f --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Kafka to Pubsub template. */ +package org.apache.beam.examples.complete.kafkatopubsub; diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/transforms/FormatTransform.java b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/transforms/FormatTransform.java new file mode 100644 index 000000000000..b6c5846f83ce --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/transforms/FormatTransform.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.kafkatopubsub.transforms; + +import java.util.List; +import java.util.Map; +import org.apache.beam.examples.complete.kafkatopubsub.avro.AvroDataClass; +import org.apache.beam.examples.complete.kafkatopubsub.avro.AvroDataClassKafkaAvroDeserializer; +import org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer.SslConsumerFactoryFn; +import org.apache.beam.examples.complete.kafkatopubsub.options.KafkaToPubsubOptions; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.kafka.common.serialization.StringDeserializer; + +/** Different transformations over the processed data in the pipeline. */ +public class FormatTransform { + + public enum FORMAT { + PUBSUB, + AVRO + } + + /** + * Configures Kafka consumer. + * + * @param bootstrapServers Kafka servers to read from + * @param topicsList Kafka topics to read from + * @param kafkaConfig configuration for the Kafka consumer + * @param sslConfig configuration for the SSL connection + * @return configured reading from Kafka + */ + public static PTransform>> readFromKafka( + String bootstrapServers, + List topicsList, + Map kafkaConfig, + Map sslConfig) { + return KafkaIO.read() + .withBootstrapServers(bootstrapServers) + .withTopics(topicsList) + .withKeyDeserializerAndCoder( + StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()).getValueCoder()) + .withValueDeserializerAndCoder( + StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()).getValueCoder()) + .withConsumerConfigUpdates(kafkaConfig) + .withConsumerFactoryFn(new SslConsumerFactoryFn(sslConfig)) + .withoutMetadata(); + } + + /** + * Configures Kafka consumer to read avros to {@link AvroDataClass} format. + * + * @param bootstrapServers Kafka servers to read from + * @param topicsList Kafka topics to read from + * @param config configuration for the Kafka consumer + * @return configured reading from Kafka + */ + public static PTransform>> readAvrosFromKafka( + String bootstrapServers, + List topicsList, + Map config, + Map sslConfig) { + return KafkaIO.read() + .withBootstrapServers(bootstrapServers) + .withTopics(topicsList) + .withKeyDeserializerAndCoder( + StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()).getValueCoder()) + .withValueDeserializerAndCoder( + AvroDataClassKafkaAvroDeserializer.class, AvroCoder.of(AvroDataClass.class)) + .withConsumerConfigUpdates(config) + .withConsumerFactoryFn(new SslConsumerFactoryFn(sslConfig)) + .withoutMetadata(); + } + + /** + * The {@link FormatOutput} wraps a String serializable messages with the {@link PubsubMessage} + * class. + */ + public static class FormatOutput extends PTransform, PDone> { + + private final KafkaToPubsubOptions options; + + public FormatOutput(KafkaToPubsubOptions options) { + this.options = options; + } + + @Override + public PDone expand(PCollection input) { + return input + .apply( + "convertMessagesToPubsubMessages", + MapElements.into(TypeDescriptor.of(PubsubMessage.class)) + .via( + (String json) -> + new PubsubMessage(json.getBytes(Charsets.UTF_8), ImmutableMap.of()))) + .apply( + "writePubsubMessagesToPubSub", PubsubIO.writeMessages().to(options.getOutputTopic())); + } + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/transforms/package-info.java b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/transforms/package-info.java new file mode 100644 index 000000000000..83b9d5c3c956 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/transforms/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Kafka to Pubsub template. */ +package org.apache.beam.examples.complete.kafkatopubsub.transforms; diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/kafkatopubsub/KafkaToPubsubTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/kafkatopubsub/KafkaToPubsubTest.java new file mode 100644 index 000000000000..e71b30459723 --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/complete/kafkatopubsub/KafkaToPubsubTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.kafkatopubsub; + +import static org.apache.beam.examples.complete.kafkatopubsub.KafkaPubsubConstants.PASSWORD; +import static org.apache.beam.examples.complete.kafkatopubsub.KafkaPubsubConstants.USERNAME; +import static org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer.Utils.getKafkaCredentialsFromVault; + +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer.Utils; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test of KafkaToPubsub. */ +@RunWith(JUnit4.class) +public class KafkaToPubsubTest { + + /** Tests configureKafka() with a null input properties. */ + @Test + public void testConfigureKafkaNullProps() { + Map config = Utils.configureKafka(null); + Assert.assertEquals(new HashMap<>(), config); + } + + /** Tests configureKafka() without a Password in input properties. */ + @Test + public void testConfigureKafkaNoPassword() { + Map props = new HashMap<>(); + props.put(USERNAME, "username"); + Map config = Utils.configureKafka(props); + Assert.assertEquals(new HashMap<>(), config); + } + + /** Tests configureKafka() without a Username in input properties. */ + @Test + public void testConfigureKafkaNoUsername() { + Map props = new HashMap<>(); + props.put(PASSWORD, "password"); + Map config = Utils.configureKafka(props); + Assert.assertEquals(new HashMap<>(), config); + } + + /** Tests configureKafka() with an appropriate input properties. */ + @Test + public void testConfigureKafka() { + Map props = new HashMap<>(); + props.put(USERNAME, "username"); + props.put(PASSWORD, "password"); + + Map expectedConfig = new HashMap<>(); + expectedConfig.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName()); + expectedConfig.put( + SaslConfigs.SASL_JAAS_CONFIG, + String.format( + "org.apache.kafka.common.security.scram.ScramLoginModule required " + + "username=\"%s\" password=\"%s\";", + props.get(USERNAME), props.get(PASSWORD))); + + Map config = Utils.configureKafka(props); + Assert.assertEquals(expectedConfig, config); + } + + /** Tests getKafkaCredentialsFromVault() with an invalid url. */ + @Test + public void testGetKafkaCredentialsFromVaultInvalidUrl() { + Map> credentials = + getKafkaCredentialsFromVault("some-url", "some-token"); + Assert.assertEquals(new HashMap<>(), credentials); + } +}