From 181d19a76fcb092cd20d6ffb681ca3ac1fa98e47 Mon Sep 17 00:00:00 2001 From: Yuki Shiga Date: Wed, 15 Mar 2017 13:31:09 +0900 Subject: [PATCH] Added spark streaming custom receiver for pulsar --- docs/Documentation.md | 1 + docs/PulsarSpark.md | 45 +++++++ docs/locale/ja/Documentation.md | 1 + docs/locale/ja/PulsarSpark.md | 45 +++++++ pom.xml | 29 ++++ pulsar-spark/pom.xml | 124 ++++++++++++++++++ .../spark/SparkStreamingPulsarReceiver.java | 91 +++++++++++++ .../SparkStreamingPulsarReceiverTest.java | 84 ++++++++++++ .../SparkStreamingPulsarReceiverExample.java | 69 ++++++++++ 9 files changed, 489 insertions(+) create mode 100644 docs/PulsarSpark.md create mode 100644 docs/locale/ja/PulsarSpark.md create mode 100644 pulsar-spark/pom.xml create mode 100644 pulsar-spark/src/main/java/com/yahoo/pulsar/spark/SparkStreamingPulsarReceiver.java create mode 100644 pulsar-spark/src/test/java/com/yahoo/pulsar/spark/SparkStreamingPulsarReceiverTest.java create mode 100644 pulsar-spark/src/test/java/com/yahoo/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java diff --git a/docs/Documentation.md b/docs/Documentation.md index d1bafe46c3aa8..03c033b563d22 100644 --- a/docs/Documentation.md +++ b/docs/Documentation.md @@ -12,6 +12,7 @@ - [Geo-Replication](GeoReplication.md) - [WebSocket API](WebSocket.md) - [Apache Storm adaptor](PulsarStorm.md) + - [Spark Streaming Pulsar Receiver](PulsarSpark.md) * Internal Docs - [Binary protocol specification](BinaryProtocol.md) * Other Languages diff --git a/docs/PulsarSpark.md b/docs/PulsarSpark.md new file mode 100644 index 0000000000000..0e963fac1759d --- /dev/null +++ b/docs/PulsarSpark.md @@ -0,0 +1,45 @@ +# Spark Streaming Pulsar Receiver + + + +- [Introduction](#introduction) +- [Using Spark Streaming Pulsar Receiver](#using-spark-streaming-pulsar-receiver) +- [Example](#example) + + + +## Introduction +Spark Streaming Pulsar Receiver is a custom receiver which enables Apache Spark Streaming to receive data from Pulsar. + +An application can receive RDD format data via Spark Streaming Pulsar Receiver and can process it variously. + +## Using Spark Streaming Pulsar Receiver +Include dependency for Spark Streaming Pulsar Receiver: + +```xml + + com.yahoo.pulsar + pulsar-spark + ${pulsar.version} + +``` + +Pass an instance of SparkStreamingPulsarReceiver to receiverStream method in JavaStreamingContext: +```java +SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("pulsar-spark"); +JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); + +ClientConfiguration clientConf = new ClientConfiguration(); +ConsumerConfiguration consConf = new ConsumerConfiguration(); +String url = "pulsar://localhost:6650/"; +String topic = "persistent://sample/standalone/ns1/topic1"; +String subs = "sub1"; + +JavaReceiverInputDStream msgs = jssc + .receiverStream(new SparkStreamingPulsarReceiver(clientConf, consConf, url, topic, subs)); +``` + + +## Example +You can find a complete example [here](../pulsar-spark/src/test/java/com/yahoo/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java). +In this example, the number of messages which contain "Pulsar" string in received messages is counted. diff --git a/docs/locale/ja/Documentation.md b/docs/locale/ja/Documentation.md index ade0fbc3d3706..e7fd9e0b1636c 100644 --- a/docs/locale/ja/Documentation.md +++ b/docs/locale/ja/Documentation.md @@ -11,5 +11,6 @@ - [ジオレプリケーション](GeoReplication.md) - [WebSocket API](WebSocket.md) - [Apache Stormのためのアダプタ](PulsarStorm.md) + - [Spark Streaming Pulsar Receiver](PulsarSpark.md) * 内部仕様 - [バイナリプロトコルの仕様](BinaryProtocol.md) diff --git a/docs/locale/ja/PulsarSpark.md b/docs/locale/ja/PulsarSpark.md new file mode 100644 index 0000000000000..09d4c633fe2c5 --- /dev/null +++ b/docs/locale/ja/PulsarSpark.md @@ -0,0 +1,45 @@ +# Spark Streaming Pulsar Receiver + + + +- [概要](#概要) +- [Spark Streaming Pulsar Receiverの利用](#spark-streaming-pulsar-receiverの利用) +- [実装例](#実装例) + + + +## 概要 +Spark Streaming Pulsar ReceiverはApache Spark StreamingがPulsarからデータを受け取るためのCustom Receiverです。 + +アプリケーションはSpark Streaming Pulsar Receiverを通してRDD形式のデータを受け取り、様々な処理を行うことができます。 + +## Spark Streaming Pulsar Receiverの利用 +Spark Streaming Pulsar Receiverの依存をincludeします: + +```xml + + com.yahoo.pulsar + pulsar-spark + ${pulsar.version} + +``` + +JavaStreamingContextのreceiverStreamメソッドにSparkStreamingPulsarReceiverのインスタンスを渡します: +```java +SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("pulsar-spark"); +JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); + +ClientConfiguration clientConf = new ClientConfiguration(); +ConsumerConfiguration consConf = new ConsumerConfiguration(); +String url = "pulsar://localhost:6650/"; +String topic = "persistent://sample/standalone/ns1/topic1"; +String subs = "sub1"; + +JavaReceiverInputDStream msgs = jssc + .receiverStream(new SparkStreamingPulsarReceiver(clientConf, consConf, url, topic, subs)); +``` + + +## 実装例 +完全な実装の例は[SparkStreamingPulsarReceiver.java](../../../pulsar-spark/src/test/java/com/yahoo/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java)を参照してください。 +この例では、受け取ったメッセージのうち"Pulsar"という文字列が含まれるものがいくつあるかを数えます。 diff --git a/pom.xml b/pom.xml index 0c2c219d6405b..8c5eed7c434cf 100644 --- a/pom.xml +++ b/pom.xml @@ -78,6 +78,7 @@ flexible messaging model and an intuitive client API. pulsar-websocket pulsar-discovery-service pulsar-storm + pulsar-spark pulsar-zookeeper-utils pulsar-checksum pulsar-testclient @@ -404,6 +405,34 @@ flexible messaging model and an intuitive client API. ${athenz.version} + + org.apache.spark + spark-streaming_2.10 + 2.1.0 + + + com.google.guava + guava + + + io.netty + netty-codec-http + + + io.netty + netty-transport-native-epoll + + + io.netty + netty + + + io.netty + netty-all + + + + diff --git a/pulsar-spark/pom.xml b/pulsar-spark/pom.xml new file mode 100644 index 0000000000000..99580d42ebe69 --- /dev/null +++ b/pulsar-spark/pom.xml @@ -0,0 +1,124 @@ + + + 4.0.0 + + + com.yahoo.pulsar + pulsar + 1.17-SNAPSHOT + .. + + + pulsar-spark + Spark Streaming Pulsar Receivers + + + + ${project.groupId} + pulsar-broker + ${project.version} + test + + + + ${project.groupId} + pulsar-broker + ${project.version} + test + test-jar + + + + org.mockito + mockito-core + test + + + + ${project.groupId} + managed-ledger + ${project.version} + test + test-jar + + + + ${project.groupId} + pulsar-client + ${project.version} + + + + org.slf4j + slf4j-api + + + + org.apache.spark + spark-streaming_2.10 + + + + com.google.guava + guava + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + true + ture + + + com.google.guava:guava + io.netty:netty-codec-http + io.netty:netty-transport-native-epoll + io.netty:netty + io.netty:netty-all + + + + + com.google + com.yahoo.pulsar.shade.com.google + + + io.netty + com.yahoo.pulsar.shade.io.netty + + + + + + + + + diff --git a/pulsar-spark/src/main/java/com/yahoo/pulsar/spark/SparkStreamingPulsarReceiver.java b/pulsar-spark/src/main/java/com/yahoo/pulsar/spark/SparkStreamingPulsarReceiver.java new file mode 100644 index 0000000000000..eb433bc647bfb --- /dev/null +++ b/pulsar-spark/src/main/java/com/yahoo/pulsar/spark/SparkStreamingPulsarReceiver.java @@ -0,0 +1,91 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.yahoo.pulsar.spark; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.receiver.Receiver; + +import com.yahoo.pulsar.client.api.ClientConfiguration; +import com.yahoo.pulsar.client.api.Consumer; +import com.yahoo.pulsar.client.api.ConsumerConfiguration; +import com.yahoo.pulsar.client.api.Message; +import com.yahoo.pulsar.client.api.MessageListener; +import com.yahoo.pulsar.client.api.PulsarClient; +import com.yahoo.pulsar.client.api.PulsarClientException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class SparkStreamingPulsarReceiver extends Receiver { + + private ClientConfiguration clientConfiguration; + private ConsumerConfiguration consumerConfiguration; + private PulsarClient pulsarClient; + private String url; + private String topic; + private String subscription; + + public SparkStreamingPulsarReceiver(ClientConfiguration clientConfiguration, + ConsumerConfiguration consumerConfiguration, String url, String topic, String subscription) { + this(StorageLevel.MEMORY_AND_DISK_2(), clientConfiguration, consumerConfiguration, url, topic, subscription); + } + + public SparkStreamingPulsarReceiver(StorageLevel storageLevel, ClientConfiguration clientConfiguration, + ConsumerConfiguration consumerConfiguration, String url, String topic, String subscription) { + super(storageLevel); + this.clientConfiguration = clientConfiguration; + this.url = url; + this.topic = topic; + this.subscription = subscription; + if (consumerConfiguration.getAckTimeoutMillis() == 0) { + consumerConfiguration.setAckTimeout(60, TimeUnit.SECONDS); + } + consumerConfiguration.setMessageListener((consumer, msg) -> { + try { + store(msg.getData()); + consumer.acknowledgeAsync(msg); + } catch (Exception e) { + log.error("Failed to store a message : {}", e.getMessage()); + } + }); + this.consumerConfiguration = consumerConfiguration; + } + + public void onStart() { + try { + pulsarClient = PulsarClient.create(url, clientConfiguration); + pulsarClient.subscribe(topic, subscription, consumerConfiguration); + } catch (PulsarClientException e) { + log.error("Failed to start subscription : {}", e.getMessage()); + restart("Restart a consumer"); + } + } + + public void onStop() { + try { + if (pulsarClient != null) { + pulsarClient.close(); + } + } catch (PulsarClientException e) { + log.error("Failed to close client : {}", e.getMessage()); + } + } + + private static final Logger log = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class); +} \ No newline at end of file diff --git a/pulsar-spark/src/test/java/com/yahoo/pulsar/spark/SparkStreamingPulsarReceiverTest.java b/pulsar-spark/src/test/java/com/yahoo/pulsar/spark/SparkStreamingPulsarReceiverTest.java new file mode 100644 index 0000000000000..6160039ae22a2 --- /dev/null +++ b/pulsar-spark/src/test/java/com/yahoo/pulsar/spark/SparkStreamingPulsarReceiverTest.java @@ -0,0 +1,84 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.spark; + +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.doNothing; + +import com.yahoo.pulsar.client.api.*; +import org.mockito.ArgumentCaptor; + +import static org.testng.Assert.assertEquals; + +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import com.yahoo.pulsar.spark.SparkStreamingPulsarReceiver; + +public class SparkStreamingPulsarReceiverTest extends MockedPulsarServiceBaseTest { + + @BeforeClass + @Override + protected void setup() throws Exception { + internalSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + internalCleanup(); + } + + @Test + public void testReceivedMessage() throws Exception { + ClientConfiguration clientConf = new ClientConfiguration(); + ConsumerConfiguration consConf = new ConsumerConfiguration(); + String url = "pulsar://127.0.0.1:" + BROKER_PORT + "/"; + String topic = "persistent://p1/c1/ns1/topic1"; + String subs = "sub1"; + + SparkStreamingPulsarReceiver receiver = spy( + new SparkStreamingPulsarReceiver(clientConf, consConf, url, topic, subs)); + MessageListener msgListener = spy(new MessageListener() { + @Override + public void received(Consumer consumer, Message msg) { + return; + } + }); + final ArgumentCaptor consCaptor = ArgumentCaptor.forClass(Consumer.class); + final ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(Message.class); + doNothing().when(msgListener).received(consCaptor.capture(), msgCaptor.capture()); + consConf.setMessageListener(msgListener); + + receiver.onStart(); + waitForTransmission(); + PulsarClient pulsarClient = PulsarClient.create(url, clientConf); + Producer producer = pulsarClient.createProducer(topic, new ProducerConfiguration()); + producer.send("pulsar-spark test message".getBytes()); + waitForTransmission(); + receiver.onStop(); + assertEquals(new String(msgCaptor.getValue().getData()), "pulsar-spark test message"); + } + + private static void waitForTransmission() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } +} diff --git a/pulsar-spark/src/test/java/com/yahoo/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java b/pulsar-spark/src/test/java/com/yahoo/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java new file mode 100644 index 0000000000000..8e3c5d7415dbf --- /dev/null +++ b/pulsar-spark/src/test/java/com/yahoo/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java @@ -0,0 +1,69 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.yahoo.pulsar.spark.example; + +import com.yahoo.pulsar.client.api.Message; +import com.yahoo.pulsar.spark.SparkStreamingPulsarReceiver; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaDStream; + +import com.yahoo.pulsar.client.api.ClientConfiguration; +import com.yahoo.pulsar.client.api.ConsumerConfiguration; + +import java.util.Iterator; +import java.util.Arrays; + +public class SparkStreamingPulsarReceiverExample { + public static void main(String[] args) throws InterruptedException { + SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("pulsar-spark"); + JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); + + ClientConfiguration clientConf = new ClientConfiguration(); + ConsumerConfiguration consConf = new ConsumerConfiguration(); + String url = "pulsar://localhost:6650/"; + String topic = "persistent://sample/standalone/ns1/topic1"; + String subs = "sub1"; + + JavaReceiverInputDStream msgs = jssc + .receiverStream(new SparkStreamingPulsarReceiver(clientConf, consConf, url, topic, subs)); + + JavaDStream isContainingPulsar = msgs.flatMap(new FlatMapFunction() { + @Override + public Iterator call(byte[] msg) { + return Arrays.asList(((new String(msg)).indexOf("Pulsar") != -1) ? 1 : 0).iterator(); + } + }); + + JavaDStream numOfPulsar = isContainingPulsar.reduce(new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + + numOfPulsar.print(); + + jssc.start(); + jssc.awaitTermination(); + } +}