diff --git a/docs/Documentation.md b/docs/Documentation.md
index 04b97fc5cdf57..b2b42b8cc7747 100644
--- a/docs/Documentation.md
+++ b/docs/Documentation.md
@@ -12,6 +12,7 @@
- [Geo-Replication](GeoReplication.md)
- [WebSocket API](WebSocket.md)
- [Apache Storm adaptor](PulsarStorm.md)
+ - [Spark Streaming Pulsar Receiver](PulsarSpark.md)
- [Modular Load Manager](ModularLoadManager.md)
* Internal Docs
- [Binary protocol specification](BinaryProtocol.md)
diff --git a/docs/PulsarSpark.md b/docs/PulsarSpark.md
new file mode 100644
index 0000000000000..0e963fac1759d
--- /dev/null
+++ b/docs/PulsarSpark.md
@@ -0,0 +1,45 @@
+# Spark Streaming Pulsar Receiver
+
+
+
+- [Introduction](#introduction)
+- [Using Spark Streaming Pulsar Receiver](#using-spark-streaming-pulsar-receiver)
+- [Example](#example)
+
+
+
+## Introduction
+Spark Streaming Pulsar Receiver is a custom receiver which enables Apache Spark Streaming to receive data from Pulsar.
+
+An application can receive RDD format data via Spark Streaming Pulsar Receiver and can process it variously.
+
+## Using Spark Streaming Pulsar Receiver
+Include dependency for Spark Streaming Pulsar Receiver:
+
+```xml
+
+ com.yahoo.pulsar
+ pulsar-spark
+ ${pulsar.version}
+
+```
+
+Pass an instance of SparkStreamingPulsarReceiver to receiverStream method in JavaStreamingContext:
+```java
+SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("pulsar-spark");
+JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
+
+ClientConfiguration clientConf = new ClientConfiguration();
+ConsumerConfiguration consConf = new ConsumerConfiguration();
+String url = "pulsar://localhost:6650/";
+String topic = "persistent://sample/standalone/ns1/topic1";
+String subs = "sub1";
+
+JavaReceiverInputDStream msgs = jssc
+ .receiverStream(new SparkStreamingPulsarReceiver(clientConf, consConf, url, topic, subs));
+```
+
+
+## Example
+You can find a complete example [here](../pulsar-spark/src/test/java/com/yahoo/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java).
+In this example, the number of messages which contain "Pulsar" string in received messages is counted.
diff --git a/docs/locale/ja/Documentation.md b/docs/locale/ja/Documentation.md
index ade0fbc3d3706..e7fd9e0b1636c 100644
--- a/docs/locale/ja/Documentation.md
+++ b/docs/locale/ja/Documentation.md
@@ -11,5 +11,6 @@
- [ジオレプリケーション](GeoReplication.md)
- [WebSocket API](WebSocket.md)
- [Apache Stormのためのアダプタ](PulsarStorm.md)
+ - [Spark Streaming Pulsar Receiver](PulsarSpark.md)
* 内部仕様
- [バイナリプロトコルの仕様](BinaryProtocol.md)
diff --git a/docs/locale/ja/PulsarSpark.md b/docs/locale/ja/PulsarSpark.md
new file mode 100644
index 0000000000000..09d4c633fe2c5
--- /dev/null
+++ b/docs/locale/ja/PulsarSpark.md
@@ -0,0 +1,45 @@
+# Spark Streaming Pulsar Receiver
+
+
+
+- [概要](#概要)
+- [Spark Streaming Pulsar Receiverの利用](#spark-streaming-pulsar-receiverの利用)
+- [実装例](#実装例)
+
+
+
+## 概要
+Spark Streaming Pulsar ReceiverはApache Spark StreamingがPulsarからデータを受け取るためのCustom Receiverです。
+
+アプリケーションはSpark Streaming Pulsar Receiverを通してRDD形式のデータを受け取り、様々な処理を行うことができます。
+
+## Spark Streaming Pulsar Receiverの利用
+Spark Streaming Pulsar Receiverの依存をincludeします:
+
+```xml
+
+ com.yahoo.pulsar
+ pulsar-spark
+ ${pulsar.version}
+
+```
+
+JavaStreamingContextのreceiverStreamメソッドにSparkStreamingPulsarReceiverのインスタンスを渡します:
+```java
+SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("pulsar-spark");
+JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
+
+ClientConfiguration clientConf = new ClientConfiguration();
+ConsumerConfiguration consConf = new ConsumerConfiguration();
+String url = "pulsar://localhost:6650/";
+String topic = "persistent://sample/standalone/ns1/topic1";
+String subs = "sub1";
+
+JavaReceiverInputDStream msgs = jssc
+ .receiverStream(new SparkStreamingPulsarReceiver(clientConf, consConf, url, topic, subs));
+```
+
+
+## 実装例
+完全な実装の例は[SparkStreamingPulsarReceiver.java](../../../pulsar-spark/src/test/java/com/yahoo/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java)を参照してください。
+この例では、受け取ったメッセージのうち"Pulsar"という文字列が含まれるものがいくつあるかを数えます。
diff --git a/pom.xml b/pom.xml
index 3bd6b9e48efef..bffcbd9098573 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,6 +78,7 @@ flexible messaging model and an intuitive client API.
pulsar-websocket
pulsar-discovery-service
pulsar-storm
+ pulsar-spark
pulsar-zookeeper-utils
pulsar-checksum
pulsar-testclient
@@ -404,6 +405,34 @@ flexible messaging model and an intuitive client API.
${athenz.version}
+
+ org.apache.spark
+ spark-streaming_2.10
+ 2.1.0
+
+
+ com.google.guava
+ guava
+
+
+ io.netty
+ netty-codec-http
+
+
+ io.netty
+ netty-transport-native-epoll
+
+
+ io.netty
+ netty
+
+
+ io.netty
+ netty-all
+
+
+
+
diff --git a/pulsar-spark/pom.xml b/pulsar-spark/pom.xml
new file mode 100644
index 0000000000000..99580d42ebe69
--- /dev/null
+++ b/pulsar-spark/pom.xml
@@ -0,0 +1,124 @@
+
+
+ 4.0.0
+
+
+ com.yahoo.pulsar
+ pulsar
+ 1.17-SNAPSHOT
+ ..
+
+
+ pulsar-spark
+ Spark Streaming Pulsar Receivers
+
+
+
+ ${project.groupId}
+ pulsar-broker
+ ${project.version}
+ test
+
+
+
+ ${project.groupId}
+ pulsar-broker
+ ${project.version}
+ test
+ test-jar
+
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
+ ${project.groupId}
+ managed-ledger
+ ${project.version}
+ test
+ test-jar
+
+
+
+ ${project.groupId}
+ pulsar-client
+ ${project.version}
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+ org.apache.spark
+ spark-streaming_2.10
+
+
+
+ com.google.guava
+ guava
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ package
+
+ shade
+
+
+ true
+ ture
+
+
+ com.google.guava:guava
+ io.netty:netty-codec-http
+ io.netty:netty-transport-native-epoll
+ io.netty:netty
+ io.netty:netty-all
+
+
+
+
+ com.google
+ com.yahoo.pulsar.shade.com.google
+
+
+ io.netty
+ com.yahoo.pulsar.shade.io.netty
+
+
+
+
+
+
+
+
+
diff --git a/pulsar-spark/src/main/java/com/yahoo/pulsar/spark/SparkStreamingPulsarReceiver.java b/pulsar-spark/src/main/java/com/yahoo/pulsar/spark/SparkStreamingPulsarReceiver.java
new file mode 100644
index 0000000000000..eb433bc647bfb
--- /dev/null
+++ b/pulsar-spark/src/main/java/com/yahoo/pulsar/spark/SparkStreamingPulsarReceiver.java
@@ -0,0 +1,91 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.yahoo.pulsar.spark;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+
+import com.yahoo.pulsar.client.api.ClientConfiguration;
+import com.yahoo.pulsar.client.api.Consumer;
+import com.yahoo.pulsar.client.api.ConsumerConfiguration;
+import com.yahoo.pulsar.client.api.Message;
+import com.yahoo.pulsar.client.api.MessageListener;
+import com.yahoo.pulsar.client.api.PulsarClient;
+import com.yahoo.pulsar.client.api.PulsarClientException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class SparkStreamingPulsarReceiver extends Receiver {
+
+ private ClientConfiguration clientConfiguration;
+ private ConsumerConfiguration consumerConfiguration;
+ private PulsarClient pulsarClient;
+ private String url;
+ private String topic;
+ private String subscription;
+
+ public SparkStreamingPulsarReceiver(ClientConfiguration clientConfiguration,
+ ConsumerConfiguration consumerConfiguration, String url, String topic, String subscription) {
+ this(StorageLevel.MEMORY_AND_DISK_2(), clientConfiguration, consumerConfiguration, url, topic, subscription);
+ }
+
+ public SparkStreamingPulsarReceiver(StorageLevel storageLevel, ClientConfiguration clientConfiguration,
+ ConsumerConfiguration consumerConfiguration, String url, String topic, String subscription) {
+ super(storageLevel);
+ this.clientConfiguration = clientConfiguration;
+ this.url = url;
+ this.topic = topic;
+ this.subscription = subscription;
+ if (consumerConfiguration.getAckTimeoutMillis() == 0) {
+ consumerConfiguration.setAckTimeout(60, TimeUnit.SECONDS);
+ }
+ consumerConfiguration.setMessageListener((consumer, msg) -> {
+ try {
+ store(msg.getData());
+ consumer.acknowledgeAsync(msg);
+ } catch (Exception e) {
+ log.error("Failed to store a message : {}", e.getMessage());
+ }
+ });
+ this.consumerConfiguration = consumerConfiguration;
+ }
+
+ public void onStart() {
+ try {
+ pulsarClient = PulsarClient.create(url, clientConfiguration);
+ pulsarClient.subscribe(topic, subscription, consumerConfiguration);
+ } catch (PulsarClientException e) {
+ log.error("Failed to start subscription : {}", e.getMessage());
+ restart("Restart a consumer");
+ }
+ }
+
+ public void onStop() {
+ try {
+ if (pulsarClient != null) {
+ pulsarClient.close();
+ }
+ } catch (PulsarClientException e) {
+ log.error("Failed to close client : {}", e.getMessage());
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);
+}
\ No newline at end of file
diff --git a/pulsar-spark/src/test/java/com/yahoo/pulsar/spark/SparkStreamingPulsarReceiverTest.java b/pulsar-spark/src/test/java/com/yahoo/pulsar/spark/SparkStreamingPulsarReceiverTest.java
new file mode 100644
index 0000000000000..6160039ae22a2
--- /dev/null
+++ b/pulsar-spark/src/test/java/com/yahoo/pulsar/spark/SparkStreamingPulsarReceiverTest.java
@@ -0,0 +1,84 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.pulsar.spark;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.doNothing;
+
+import com.yahoo.pulsar.client.api.*;
+import org.mockito.ArgumentCaptor;
+
+import static org.testng.Assert.assertEquals;
+
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import com.yahoo.pulsar.spark.SparkStreamingPulsarReceiver;
+
+public class SparkStreamingPulsarReceiverTest extends MockedPulsarServiceBaseTest {
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ internalSetup();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ }
+
+ @Test
+ public void testReceivedMessage() throws Exception {
+ ClientConfiguration clientConf = new ClientConfiguration();
+ ConsumerConfiguration consConf = new ConsumerConfiguration();
+ String url = "pulsar://127.0.0.1:" + BROKER_PORT + "/";
+ String topic = "persistent://p1/c1/ns1/topic1";
+ String subs = "sub1";
+
+ SparkStreamingPulsarReceiver receiver = spy(
+ new SparkStreamingPulsarReceiver(clientConf, consConf, url, topic, subs));
+ MessageListener msgListener = spy(new MessageListener() {
+ @Override
+ public void received(Consumer consumer, Message msg) {
+ return;
+ }
+ });
+ final ArgumentCaptor consCaptor = ArgumentCaptor.forClass(Consumer.class);
+ final ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(Message.class);
+ doNothing().when(msgListener).received(consCaptor.capture(), msgCaptor.capture());
+ consConf.setMessageListener(msgListener);
+
+ receiver.onStart();
+ waitForTransmission();
+ PulsarClient pulsarClient = PulsarClient.create(url, clientConf);
+ Producer producer = pulsarClient.createProducer(topic, new ProducerConfiguration());
+ producer.send("pulsar-spark test message".getBytes());
+ waitForTransmission();
+ receiver.onStop();
+ assertEquals(new String(msgCaptor.getValue().getData()), "pulsar-spark test message");
+ }
+
+ private static void waitForTransmission() {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+}
diff --git a/pulsar-spark/src/test/java/com/yahoo/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java b/pulsar-spark/src/test/java/com/yahoo/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java
new file mode 100644
index 0000000000000..8e3c5d7415dbf
--- /dev/null
+++ b/pulsar-spark/src/test/java/com/yahoo/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java
@@ -0,0 +1,69 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.yahoo.pulsar.spark.example;
+
+import com.yahoo.pulsar.client.api.Message;
+import com.yahoo.pulsar.spark.SparkStreamingPulsarReceiver;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaDStream;
+
+import com.yahoo.pulsar.client.api.ClientConfiguration;
+import com.yahoo.pulsar.client.api.ConsumerConfiguration;
+
+import java.util.Iterator;
+import java.util.Arrays;
+
+public class SparkStreamingPulsarReceiverExample {
+ public static void main(String[] args) throws InterruptedException {
+ SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("pulsar-spark");
+ JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
+
+ ClientConfiguration clientConf = new ClientConfiguration();
+ ConsumerConfiguration consConf = new ConsumerConfiguration();
+ String url = "pulsar://localhost:6650/";
+ String topic = "persistent://sample/standalone/ns1/topic1";
+ String subs = "sub1";
+
+ JavaReceiverInputDStream msgs = jssc
+ .receiverStream(new SparkStreamingPulsarReceiver(clientConf, consConf, url, topic, subs));
+
+ JavaDStream isContainingPulsar = msgs.flatMap(new FlatMapFunction() {
+ @Override
+ public Iterator call(byte[] msg) {
+ return Arrays.asList(((new String(msg)).indexOf("Pulsar") != -1) ? 1 : 0).iterator();
+ }
+ });
+
+ JavaDStream numOfPulsar = isContainingPulsar.reduce(new Function2() {
+ @Override
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ });
+
+ numOfPulsar.print();
+
+ jssc.start();
+ jssc.awaitTermination();
+ }
+}