diff --git a/.travis.yml b/.travis.yml
index 25cfece665809..90ad90ca16234 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,18 +1,24 @@
-language: java
+language: java C++
jdk:
- oraclejdk8
cache:
directories:
- $HOME/.m2
-
+ - $HOME/pulsar-dep
# Reconstruct the gpg keys to sign the artifacts
before_deploy:
- echo $GPG_SECRET_KEYS | base64 --decode | $GPG_EXECUTABLE --import --batch || true
- echo $GPG_OWNERTRUST | base64 --decode | $GPG_EXECUTABLE --import-ownertrust --batch || true
+install:
+ - sudo bash -x $TRAVIS_BUILD_DIR/pulsar-client-cpp/travis-build.sh $HOME/pulsar-dep $TRAVIS_BUILD_DIR dep
+
+after_success:
+ - sudo bash -x $TRAVIS_BUILD_DIR/pulsar-client-cpp/travis-build.sh $HOME/pulsar-dep $TRAVIS_BUILD_DIR compile
+
deploy:
-
provider: script
@@ -27,6 +33,7 @@ deploy:
on:
tags: true
-
+
provider: releases
api_key:
secure: mIACMQlTzepBa9wWbKlME1Ig9/vKLHz63fn2zR2fbYjaAgXCPVe1lbTLbOZb62yB0XL7rUekyKIf0DEFH/cS1XjCb7aDKo4pLh6uiHdIIS8t1qEJJT54vbLwnWJXjo+14QHvaXgDlO/YoxpyOicrKI++b3fScD0zK2I8R6Lmwan/ZQze9uhRO0RKGChsDAszy+98C6JJxQXWQ0YjnUhwP5PtZX3Fm1rxtuCIk2Fl9gQdp9/j9U6vRKtWaO22Q2YaaaPGGoVyTwV6iMSOXDMb6zhjEQ3aiuJMJHUJRcGEU4fV7hkiUukWdo5+5C/mASNiJDYefG86KfCktMniPMzyAPXNc6hUzbOZuLNI1/f1QqBwzTJbH7NIUjz5f0hjNsHuYvkL8TcxE9pDA0Qkr8OIWR8M3+H7WKuiSTaVSeCobGBE8g6ymanlRvOQZblFpgw91B/KmZucsin0+rV5tVRlqTBYHL5f6fXEyhKdGYRiHaNR29mBBJsZng2tR6wVjPGqyEfdwFVOs44d2Rkt885VjZthap/Yw+SJKOvbJv1zaRglmbvbl629LvYOgT6ptYPDJyu/J/kzPrWnzvyTf72M6bR991Kx8gEkT4WRwCRBAuhg8i2bmIcsjbXtLcB0YRHgrBueJD0SuLREtcxJYvkgMI1UZon5UrCTkJDc0oFLO28=
diff --git a/pom.xml b/pom.xml
index 72de492e6d205..d2073b306c54f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -466,6 +466,11 @@ flexible messaging model and an intuitive client API.
logs/**
**/*.versionsBackup
**/circe/**
+ pulsar-client-cpp/lib/checksum/int_types.h
+ pulsar-client-cpp/lib/checksum/crc32c*
+ pulsar-client-cpp/lib/lz4/lz4.*
+ pulsar-client-cpp/lib/PulsarApi.pb.*
+ pulsar-client-cpp/CMakeFiles/**
JAVADOC_STYLE
diff --git a/pulsar-client-cpp/.gitignore b/pulsar-client-cpp/.gitignore
new file mode 100644
index 0000000000000..b0dbd788e44ff
--- /dev/null
+++ b/pulsar-client-cpp/.gitignore
@@ -0,0 +1,58 @@
+# Compiled Object files
+*.slo
+*.lo
+*.o
+*.obj
+*.os
+*.scons*
+
+# Compiled Dynamic libraries
+*.so
+*.dylib
+*.dll
+*.so.1
+
+# Compiled Static libraries
+*.lai
+*.la
+*.a
+*.lib
+
+# Executables
+*.exe
+*.out
+*.app
+
+# Dependency file
+*.d
+
+# Mac swap file
+*.DS_Store
+
+# Linux swap file
+*.swp
+
+# Exclude compiled executables
+/examples/SampleProducer
+/examples/SampleConsumer
+/examples/SampleAsyncProducer
+/examples/SampleConsumerListener
+/tests/main
+/perf/PerfProducer
+/perf/PerfConsumer
+/system-test/SystemTest
+
+# Eclipse generated files
+.cproject
+.project
+.settings/
+.pydevproject
+
+# doxygen files
+apidocs/
+
+# CMAKE
+Makefile
+cmake_install.cmake
+CMakeFiles
+CMakeCache.txt
diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt
new file mode 100644
index 0000000000000..4e74dbc1a603c
--- /dev/null
+++ b/pulsar-client-cpp/CMakeLists.txt
@@ -0,0 +1,65 @@
+#
+# Copyright 2016 Yahoo Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cmake_minimum_required(VERSION 2.8)
+project (pulsar-cpp)
+
+set(Boost_NO_BOOST_CMAKE ON)
+
+set (CMAKE_CXX_FLAGS "-Wno-deprecated-declarations ${CMAKE_CXX_FLAGS}")
+
+find_package(Boost REQUIRED COMPONENTS program_options filesystem regex thread system)
+find_package(OpenSSL REQUIRED)
+find_package(ZLIB REQUIRED)
+find_package(ProtoBuf QUIET)
+if(NOT ProtoBuf_FOUND)
+ find_library(PROTOBUF_LIBRARIES protobuf)
+endif()
+find_library(LOG4CXX_LIBRARY_PATH log4cxx)
+find_library(CURL_LIBRARY_PATH curl)
+find_path(LOG4CXX_INCLUDE_PATH log4cxx/logger.h)
+
+include_directories(
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/include
+ ${Boost_INCLUDE_DIR}
+ ${OPENSSL_INCLUDE_DIR}
+ ${ZLIB_INCLUDE_DIR}
+ ${PROTOBUF_INCLUDE_DIR}
+ ${LOG4CXX_INCLUDE_PATH}
+)
+
+set(COMMON_LIBS
+ ${COMMON_LIBS}
+ ${Boost_LIBRARIES}
+ ${OPENSSL_LIBRARIES}
+ ${ZLIB_LIBRARIES}
+ ${PROTOBUF_LIBRARIES}
+ ${LOG4CXX_LIBRARY_PATH}
+ ${CURL_LIBRARY_PATH}
+)
+
+link_directories(${CMAKE_BINARY_DIR}/lib)
+
+set(CLIENT_LIBS
+ ${COMMON_LIBS}
+ pulsar
+)
+
+add_subdirectory(lib)
+add_subdirectory(perf)
+add_subdirectory(examples)
+add_subdirectory(tests)
diff --git a/pulsar-client-cpp/NOTICE b/pulsar-client-cpp/NOTICE
new file mode 100644
index 0000000000000..9154f7fcf7bdb
--- /dev/null
+++ b/pulsar-client-cpp/NOTICE
@@ -0,0 +1,33 @@
+
+Pulsar C++ Client Library
+
+Copyright 2016 Yahoo Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+
+This product contain sources from LZ4 - Fast LZ compression algorithm:
+ * Copyright (C) 2011-2015, Yann Collet.
+ * LICENSE: BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
+ * HOMEPAGE: https://github.com/Cyan4973/lz4
+
+
+This product contains sources from Circe CRC library:
+ * Copyright 2014 Trevor Robinson
+ * LICENSE : Apache License, Version 2.0
+ * HOMEPAGE: https://github.com/trevorr/circe
+
+This product contains sources from Adler CRC32c implementation:
+ * Copyright (C) 2013 Mark Adler
+ * LICENSE: ZLib License (https://opensource.org/licenses/Zlib)
+ * HOMEPAGE: https://github.com/baruch/crcbench/blob/master/crc-mark-adler.c
diff --git a/pulsar-client-cpp/README.md b/pulsar-client-cpp/README.md
new file mode 100644
index 0000000000000..d9b8b3c128c9b
--- /dev/null
+++ b/pulsar-client-cpp/README.md
@@ -0,0 +1,66 @@
+
+### Pulsar C++ client library
+
+Examples for using the API to publish and consume messages can be found on
+https://github.com/yahoo/pulsar/tree/master/pulsar-client-cpp/examples
+
+#### Requirements
+
+ * CMake
+ * Boost
+ * [Protocol Buffer 2.6](https://developers.google.com/protocol-buffers/)
+ * [Log4CXX](https://logging.apache.org/log4cxx)
+ * LibCurl
+ * [GTest](https://github.com/google/googletest)
+
+
+#### Compile on Ubuntu Server 16.04
+
+Install all dependencies:
+
+```shell
+apt-get install cmake libssl-dev libcurl4-openssl-dev liblog4cxx-dev \
+ libprotobuf-dev libboost-all-dev libgtest-dev
+```
+
+Compile and install Google Test:
+
+```shell
+cd /usr/src/gtest
+sudo cmake .
+sudo make
+sudo cp *.a /usr/lib
+```
+
+
+Compile Pulsar client library:
+
+```shell
+cd pulsar/pulsar-client-cpp
+cmake .
+make
+```
+
+Client library will be placed in
+```
+lib/libpulsar.so
+lib/libpulsar.a
+```
+
+Tools :
+
+```
+perf/perfProducer
+perf/perfConsumer
+```
+
+Tests:
+
+```
+ 1. Start the standalone pulsar
+ export PULSAR_STANDALONE_CONF=tests/standalone.conf
+ ../bin/pulsar standalone
+
+ 2. Run tests
+ tests/main
+```
diff --git a/pulsar-client-cpp/eclipse-formatter.xml b/pulsar-client-cpp/eclipse-formatter.xml
new file mode 100644
index 0000000000000..3df512ba46ffb
--- /dev/null
+++ b/pulsar-client-cpp/eclipse-formatter.xml
@@ -0,0 +1,186 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/pulsar-client-cpp/examples/CMakeLists.txt b/pulsar-client-cpp/examples/CMakeLists.txt
new file mode 100644
index 0000000000000..b5f36b5eab001
--- /dev/null
+++ b/pulsar-client-cpp/examples/CMakeLists.txt
@@ -0,0 +1,41 @@
+#
+# Copyright 2016 Yahoo Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set(SAMPLE_ASYNC_PRODUCER_SOURCES
+ SampleAsyncProducer.cc
+)
+
+set(SAMPLE_CONSUMER_SOURCES
+ SampleConsumer.cc
+)
+
+set(SAMPLE_CONSUMER_LISTENER_SOURCES
+ SampleConsumerListener.cc
+)
+
+set(SAMPLE_PRODUCER_SOURCES
+ SampleProducer.cc
+)
+
+add_executable(SampleAsyncProducer ${SAMPLE_ASYNC_PRODUCER_SOURCES})
+add_executable(SampleConsumer ${SAMPLE_CONSUMER_SOURCES})
+add_executable(SampleConsumerListener ${SAMPLE_CONSUMER_LISTENER_SOURCES})
+add_executable(SampleProducer ${SAMPLE_PRODUCER_SOURCES})
+
+target_link_libraries(SampleAsyncProducer ${CLIENT_LIBS})
+target_link_libraries(SampleConsumer ${CLIENT_LIBS})
+target_link_libraries(SampleConsumerListener ${CLIENT_LIBS})
+target_link_libraries(SampleProducer ${CLIENT_LIBS})
diff --git a/pulsar-client-cpp/examples/SampleAsyncProducer.cc b/pulsar-client-cpp/examples/SampleAsyncProducer.cc
new file mode 100644
index 0000000000000..16d5637e6e1e3
--- /dev/null
+++ b/pulsar-client-cpp/examples/SampleAsyncProducer.cc
@@ -0,0 +1,50 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+#include
+
+#include
+#include
+
+#include
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+void callback(Result code, const Message& msg) {
+ LOG_INFO("Received code: " << code << " -- Msg: " << msg);
+}
+
+int main() {
+ Client client("pulsar://localhost:6650");
+
+ Producer producer;
+ Result result = client.createProducer("persistent://prop/r1/ns1/my-topic", producer);
+ if (result != ResultOk) {
+ LOG_ERROR("Error creating producer: " << result);
+ return -1;
+ }
+
+ // Send asynchronously
+ while (true) {
+ Message msg = MessageBuilder().setContent("content").setProperty("x", "1").build();
+ producer.sendAsync(msg, callback);
+
+ sleep(1);
+ }
+}
diff --git a/pulsar-client-cpp/examples/SampleConsumer.cc b/pulsar-client-cpp/examples/SampleConsumer.cc
new file mode 100644
index 0000000000000..f8853710b2301
--- /dev/null
+++ b/pulsar-client-cpp/examples/SampleConsumer.cc
@@ -0,0 +1,46 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+
+#include
+
+#include
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+int main() {
+
+ Client client("pulsar://localhost:6650");
+
+ Consumer consumer;
+ Result result = client.subscribe("persistent://prop/r1/ns1/my-topic", "consumer-1", consumer);
+ if (result != ResultOk) {
+ LOG_ERROR("Failed to subscribe: " << result);
+ return -1;
+ }
+
+ Message msg;
+
+ while (true) {
+ consumer.receive(msg);
+ LOG_INFO("Received: " << msg << " with payload '" << msg.getDataAsString() << "'");
+
+ consumer.acknowledge(msg);
+ }
+}
diff --git a/pulsar-client-cpp/examples/SampleConsumerListener.cc b/pulsar-client-cpp/examples/SampleConsumerListener.cc
new file mode 100644
index 0000000000000..b9b427b9860ce
--- /dev/null
+++ b/pulsar-client-cpp/examples/SampleConsumerListener.cc
@@ -0,0 +1,48 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+
+#include
+
+#include
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+void listener(Consumer consumer, const Message& msg) {
+ LOG_INFO("Got message " << msg << " with content '" << msg.getDataAsString() << "'");
+
+ consumer.acknowledge(msg.getMessageId());
+}
+
+int main() {
+ Client client("pulsar://localhost:6650");
+
+ Consumer consumer;
+ ConsumerConfiguration config;
+ config.setMessageListener(listener);
+ Result result = client.subscribe("persistent://prop/r1/ns1/my-topic", "consumer-1", config, consumer);
+ if (result != ResultOk) {
+ LOG_ERROR("Failed to subscribe: " << result);
+ return -1;
+ }
+
+ // Wait
+ int n;
+ std::cin >> n;
+}
diff --git a/pulsar-client-cpp/examples/SampleProducer.cc b/pulsar-client-cpp/examples/SampleProducer.cc
new file mode 100644
index 0000000000000..471ebfcc98384
--- /dev/null
+++ b/pulsar-client-cpp/examples/SampleProducer.cc
@@ -0,0 +1,43 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+#include
+
+#include
+#include
+
+#include
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+int main() {
+ Client client("pulsar://localhost:6650");
+
+ Producer producer;
+ Result result = client.createProducer("persistent://prop/r1/ns1/my-topic", producer);
+ if (result != ResultOk) {
+ LOG_ERROR("Error creating producer: " << result);
+ return -1;
+ }
+
+ // Send synchronously
+ Message msg = MessageBuilder().setContent("content").build();
+ Result res = producer.send(msg);
+ LOG_INFO("Message sent: " << res);
+}
diff --git a/pulsar-client-cpp/generate_protobuf.sh b/pulsar-client-cpp/generate_protobuf.sh
new file mode 100755
index 0000000000000..e565196d78f7c
--- /dev/null
+++ b/pulsar-client-cpp/generate_protobuf.sh
@@ -0,0 +1,19 @@
+#
+# Copyright 2016 Yahoo Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+PROTO_PATH=../pulsar-common/src/main/proto/
+
+protoc --proto_path=$PROTO_PATH --cpp_out=lib $PROTO_PATH/PulsarApi.proto
diff --git a/pulsar-client-cpp/include/pulsar/Auth.h b/pulsar-client-cpp/include/pulsar/Auth.h
new file mode 100644
index 0000000000000..b305b22b26897
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/Auth.h
@@ -0,0 +1,57 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef PULSAR_AUTH_H_
+#define PULSAR_AUTH_H_
+
+#include
+#include
+#include
+#include
+
+#pragma GCC visibility push(default)
+
+namespace pulsar {
+
+class Authentication {
+ public:
+ virtual ~Authentication();
+ virtual const std::string getAuthMethodName() const = 0;
+ virtual Result getAuthData(std::string& authDataContent) const = 0;
+
+ protected:
+ Authentication();
+};
+
+typedef boost::shared_ptr AuthenticationPtr;
+
+class Auth {
+ public:
+ static AuthenticationPtr Disabled();
+ static AuthenticationPtr create(const std::string& dynamicLibPath);
+ static AuthenticationPtr create(const std::string& dynamicLibPath, const std::string& params);
+ protected:
+ static bool isShutdownHookRegistered_;
+ static std::vector loadedLibrariesHandles_;
+ static void release_handles();
+};
+
+}
+// namespace pulsar
+
+#pragma GCC visibility pop
+
+#endif /* PULSAR_AUTH_H_ */
diff --git a/pulsar-client-cpp/include/pulsar/BatchMessageId.h b/pulsar-client-cpp/include/pulsar/BatchMessageId.h
new file mode 100644
index 0000000000000..d21b94b50fddc
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/BatchMessageId.h
@@ -0,0 +1,56 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIB_BATCHMESSAGEID_H_
+#define LIB_BATCHMESSAGEID_H_
+
+#include
+
+namespace pulsar {
+class BatchMessageId : public MessageId {
+ public:
+ BatchMessageId(int64_t ledgerId, int64_t entryId, int batchIndex = -1)
+ : MessageId(ledgerId, entryId),
+ batchIndex_(batchIndex) {
+ }
+
+ BatchMessageId()
+ : batchIndex_(-1) {
+ }
+ // These functions compare the message order as stored in bookkeeper
+ inline bool operator<(const BatchMessageId& mID) const;
+ inline bool operator<=(const BatchMessageId& mID) const;
+ protected:
+ friend class ConsumerImpl;
+ friend class Message;
+ friend class MessageImpl;
+ friend class PartitionedProducerImpl;
+ friend class PartitionedConsumerImpl;
+ friend class BatchAcknowledgementTracker;
+ friend class PulsarFriend;
+ int64_t batchIndex_;
+};
+
+bool BatchMessageId::operator<(const BatchMessageId& mID) const {
+ return (ledgerId_ < mID.ledgerId_) || (ledgerId_ == mID.ledgerId_ && entryId_ < mID.entryId_);
+}
+
+bool BatchMessageId::operator<=(const BatchMessageId& mID) const {
+ return (ledgerId_ < mID.ledgerId_) || (ledgerId_ == mID.ledgerId_ && entryId_ <= mID.entryId_);
+}
+
+}
+#endif /* LIB_BATCHMESSAGEID_H_ */
diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h
new file mode 100644
index 0000000000000..18b1dc61ba830
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/Client.h
@@ -0,0 +1,202 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef PULSAR_CLIENT_HPP_
+#define PULSAR_CLIENT_HPP_
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+
+#pragma GCC visibility push(default)
+
+class PulsarFriend;
+
+namespace pulsar {
+
+typedef boost::function CreateProducerCallback;
+typedef boost::function SubscribeCallback;
+typedef boost::function CloseCallback;
+
+class ClientConfiguration {
+ public:
+
+ ClientConfiguration();
+ ~ClientConfiguration();
+ ClientConfiguration(const ClientConfiguration&);
+ ClientConfiguration& operator=(const ClientConfiguration&);
+
+ /**
+ * Set the authentication method to be used with the broker
+ *
+ * @param authentication the authentication data to use
+ */
+ ClientConfiguration& setAuthentication(const AuthenticationPtr& authentication);
+
+ /**
+ * @return the authentication data
+ */
+ const Authentication& getAuthentication() const;
+
+ /**
+ * Set timeout on client operations (subscribe, create producer, close, unsubscribe)
+ * Default is 30 seconds.
+ *
+ * @param timeout the timeout after which the operation will be considered as failed
+ */
+ ClientConfiguration& setOperationTimeoutSeconds(int timeout);
+
+ /**
+ * @return the client operations timeout in seconds
+ */
+ int getOperationTimeoutSeconds() const;
+
+ /**
+ * Set the number of IO threads to be used by the Pulsar client. Default is 1
+ * thread.
+ *
+ * @param threads number of threads
+ */
+ ClientConfiguration& setIOThreads(int threads);
+
+ /**
+ * @return the number of IO threads to use
+ */
+ int getIOThreads() const;
+
+ /**
+ * Set the number of threads to be used by the Pulsar client when delivering messages
+ * through message listener. Default is 1 thread per Pulsar client.
+ *
+ * If using more than 1 thread, messages for distinct MessageListener will be
+ * delivered in different threads, however a single MessageListener will always
+ * be assigned to the same thread.
+ *
+ * @param threads number of threads
+ */
+ ClientConfiguration& setMessageListenerThreads(int threads);
+
+ /**
+ * @return the number of IO threads to use
+ */
+ int getMessageListenerThreads() const;
+
+ /**
+ * Initialize the log configuration
+ *
+ * @param logConfFilePath path of the configuration file
+ */
+ ClientConfiguration& setLogConfFilePath(const std::string& logConfFilePath);
+
+ /**
+ * Get the path of log configuration file (log4cpp)
+ */
+ const std::string& getLogConfFilePath() const;
+
+ private:
+ const AuthenticationPtr& getAuthenticationPtr() const;
+
+ struct Impl;
+ boost::shared_ptr impl_;
+ friend class ClientImpl;
+};
+
+class ClientImpl;
+
+class Client {
+ public:
+ /**
+ * Create a Pulsar client object connecting to the specified cluster address and using the default configuration.
+ *
+ * @param serviceUrl the Pulsar endpoint to use (eg: http://brokerv2-pdev.messaging.corp.gq1.yahoo.com:4080 for Sandbox access)
+ */
+ Client(const std::string& serviceUrl);
+
+ /**
+ * Create a Pulsar client object connecting to the specified cluster address and using the specified configuration.
+ *
+ * @param serviceUrl the Pulsar endpoint to use (eg: http://brokerv2-pdev.messaging.corp.gq1.yahoo.com:4080 for Sandbox access)
+ * @param clientConfiguration the client configuration to use
+ */
+ Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);
+
+ /**
+ * Create a producer with default configuration
+ *
+ * @see createProducer(const std::string&, const ProducerConfiguration&, Producer&)
+ *
+ * @param topic the topic where the new producer will publish
+ * @param producer a non-const reference where the new producer will be copied
+ * @return ResultOk if the producer has been successfully created
+ * @return ResultError if there was an error
+ */
+ Result createProducer(const std::string& topic, Producer& producer);
+
+ /**
+ * Create a producer with specified configuration
+ *
+ * @see createProducer(const std::string&, const ProducerConfiguration&, Producer&)
+ *
+ * @param topic the topic where the new producer will publish
+ * @param conf the producer config to use
+ * @param producer a non-const reference where the new producer will be copied
+ * @return ResultOk if the producer has been successfully created
+ * @return ResultError if there was an error
+ */
+ Result createProducer(const std::string& topic, const ProducerConfiguration& conf,
+ Producer& producer);
+
+ void createProducerAsync(const std::string& topic, CreateProducerCallback callback);
+
+ void createProducerAsync(const std::string& topic, ProducerConfiguration conf,
+ CreateProducerCallback callback);
+
+ Result subscribe(const std::string& topic, const std::string& consumerName, Consumer& consumer);
+ Result subscribe(const std::string& topic, const std::string& consumerName,
+ const ConsumerConfiguration& conf, Consumer& consumer);
+
+ void subscribeAsync(const std::string& topic, const std::string& consumerName,
+ SubscribeCallback callback);
+ void subscribeAsync(const std::string& topic, const std::string& consumerName,
+ const ConsumerConfiguration& conf, SubscribeCallback callback);
+
+ /**
+ *
+ * @return
+ */
+ Result close();
+
+ void closeAsync(CloseCallback callback);
+
+ void shutdown();
+
+ private:
+ Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, bool poolConnections);
+
+ friend class PulsarFriend;
+ boost::shared_ptr impl_;
+};
+
+}
+
+#pragma GCC visibility pop
+
+#endif /* PULSAR_CLIENT_HPP_ */
diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h
new file mode 100644
index 0000000000000..e7621fd9a7d32
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/Consumer.h
@@ -0,0 +1,303 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef CONSUMER_HPP_
+#define CONSUMER_HPP_
+
+#include
+#include
+#include
+#include
+
+#pragma GCC visibility push(default)
+
+class PulsarFriend;
+
+namespace pulsar {
+
+class Consumer;
+
+/// Callback definition for non-data operation
+typedef boost::function ResultCallback;
+
+/// Callback definition for MessageListener
+typedef boost::function MessageListener;
+
+enum ConsumerType {
+ /**
+ * There can be only 1 consumer on the same topic with the same consumerName
+ */
+ ConsumerExclusive,
+
+ /**
+ * Multiple consumers will be able to use the same consumerName and the messages
+ * will be dispatched according to a round-robin rotation between the connected consumers
+ */
+ ConsumerShared,
+
+ /** Only one consumer is active on the subscription; Subscription can have N consumers
+ * connected one of which will get promoted to master if the current master becomes inactive
+ */
+
+ ConsumerFailover
+};
+
+/**
+ * Class specifying the configuration of a consumer.
+ */
+class ConsumerConfiguration {
+ public:
+ ConsumerConfiguration();
+ ~ConsumerConfiguration();
+ ConsumerConfiguration(const ConsumerConfiguration&);
+ ConsumerConfiguration& operator=(const ConsumerConfiguration&);
+
+ /**
+ * Specify the consumer type. The consumer type enables
+ * specifying the type of subscription. In Exclusive subscription,
+ * only a single consumer is allowed to attach to the subscription. Other consumers
+ * will get an error message. In Shared subscription, multiple consumers will be
+ * able to use the same subscription name and the messages will be dispatched in a
+ * round robin fashion. In Failover subscription, a master-slave subscription model
+ * allows for multiple consumers to attach to a single subscription, though only one
+ * of them will be “master” at a given time. Only the master consumer will receive
+ * messages. When the master gets disconnected, one among the slaves will be promoted
+ * to master and will start getting messages.
+ */
+ ConsumerConfiguration& setConsumerType(ConsumerType consumerType);
+ ConsumerType getConsumerType() const;
+
+ /**
+ * A message listener enables your application to configure how to process
+ * and acknowledge messages delivered. A listener will be called in order
+ * for every message received.
+ */
+ ConsumerConfiguration& setMessageListener(MessageListener messageListener);
+ MessageListener getMessageListener() const;
+ bool hasMessageListener() const;
+
+ /**
+ * Sets the size of the consumer receive queue.
+ *
+ * The consumer receive queue controls how many messages can be accumulated by the Consumer before the
+ * application calls receive(). Using a higher value could potentially increase the consumer throughput
+ * at the expense of bigger memory utilization.
+ *
+ * Setting the consumer queue size as zero decreases the throughput of the consumer, by disabling pre-fetching of
+ * messages. This approach improves the message distribution on shared subscription, by pushing messages only to
+ * the consumers that are ready to process them. Neither receive with timeout nor Partitioned Topics can be
+ * used if the consumer queue size is zero. The receive() function call should not be interrupted when
+ * the consumer queue size is zero.
+ *
+ * Default value is 1000 messages and should be good for most use cases.
+ *
+ * @param size
+ * the new receiver queue size value
+ */
+ void setReceiverQueueSize(int size);
+ int getReceiverQueueSize() const;
+
+ void setConsumerName(const std::string&);
+ const std::string& getConsumerName() const;
+
+ /**
+ * Set the timeout in milliseconds for unacknowledged messages, the timeout needs to be greater than
+ * 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds).
+ * If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are
+ * redelivered.
+ * @param timeout in milliseconds
+ */
+ void setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds);
+
+ /**
+ * @return the configured timeout in milliseconds for unacked messages.
+ */
+ long getUnAckedMessagesTimeoutMs() const;
+ private:
+ struct Impl;
+ boost::shared_ptr impl_;
+};
+
+class ConsumerImplBase;
+
+/**
+ *
+ */
+class Consumer {
+ public:
+ /**
+ * Construct an uninitialized consumer object
+ */
+ Consumer();
+
+ /**
+ * @return the topic this consumer is subscribed to
+ */
+ const std::string& getTopic() const;
+
+ /**
+ * @return the consumer name
+ */
+ const std::string& getSubscriptionName() const;
+
+ /**
+ * Unsubscribe the current consumer from the topic.
+ *
+ * This method will block until the operation is completed. Once the consumer is
+ * unsubscribed, no more messages will be received and subsequent new messages
+ * will not be retained for this consumer.
+ *
+ * This consumer object cannot be reused.
+ *
+ * @see asyncUnsubscribe
+ * @return Result::ResultOk if the unsubscribe operation completed successfully
+ * @return Result::ResultError if the unsubscribe operation failed
+ */
+ Result unsubscribe();
+
+ /**
+ * Asynchronously unsubscribe the current consumer from the topic.
+ *
+ * This method will block until the operation is completed. Once the consumer is
+ * unsubscribed, no more messages will be received and subsequent new messages
+ * will not be retained for this consumer.
+ *
+ * This consumer object cannot be reused.
+ *
+ * @param callback the callback to get notified when the operation is complete
+ */
+ void unsubscribeAsync(ResultCallback callback);
+
+ /**
+ * Receive a single message.
+ *
+ * If a message is not immediately available, this method will block until a new
+ * message is available.
+ *
+ * @param msg a non-const reference where the received message will be copied
+ * @return ResultOk when a message is received
+ * @return ResultInvalidConfiguration if a message listener had been set in the configuration
+ */
+ Result receive(Message& msg);
+
+ /**
+ *
+ * @param msg a non-const reference where the received message will be copied
+ * @param timeoutMs the receive timeout in milliseconds
+ * @return ResultOk if a message was received
+ * @return ResultTimeout if the receive timeout was triggered
+ * @return ResultInvalidConfiguration if a message listener had been set in the configuration
+ */
+ Result receive(Message& msg, int timeoutMs);
+
+ /**
+ * Acknowledge the reception of a single message.
+ *
+ * This method will block until an acknowledgement is sent to the broker. After
+ * that, the message will not be re-delivered to this consumer.
+ *
+ * @see asyncAcknowledge
+ * @param message the message to acknowledge
+ * @return ResultOk if the message was successfully acknowledged
+ * @return ResultError if there was a failure
+ */
+ Result acknowledge(const Message& message);
+ Result acknowledge(const MessageId& messageId);
+
+ /**
+ * Asynchronously acknowledge the reception of a single message.
+ *
+ * This method will initiate the operation and return immediately. The provided callback
+ * will be triggered when the operation is complete.
+ *
+ * @param message the message to acknowledge
+ * @param callback callback that will be triggered when the message has been acknowledged
+ */
+ void acknowledgeAsync(const Message& message, ResultCallback callback);
+ void acknowledgeAsync(const MessageId& messageID, ResultCallback callback);
+
+ /**
+ * Acknowledge the reception of all the messages in the stream up to (and including)
+ * the provided message.
+ *
+ * This method will block until an acknowledgement is sent to the broker. After
+ * that, the messages will not be re-delivered to this consumer.
+ *
+ * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+ *
+ * It's equivalent to calling asyncAcknowledgeCumulative(const Message&, ResultCallback) and
+ * waiting for the callback to be triggered.
+ *
+ * @param message the last message in the stream to acknowledge
+ * @return ResultOk if the message was successfully acknowledged. All previously delivered messages for this topic are also acknowledged.
+ * @return ResultError if there was a failure
+ */
+ Result acknowledgeCumulative(const Message& message);
+ Result acknowledgeCumulative(const MessageId& messageId);
+
+ /**
+ * Asynchronously acknowledge the reception of all the messages in the stream up to (and
+ * including) the provided message.
+ *
+ * This method will initiate the operation and return immediately. The provided callback
+ * will be triggered when the operation is complete.
+ *
+ * @param message the message to acknowledge
+ * @param callback callback that will be triggered when the message has been acknowledged
+ */
+ void acknowledgeCumulativeAsync(const Message& message, ResultCallback callback);
+ void acknowledgeCumulativeAsync(const MessageId& messageId, ResultCallback callback);
+
+ Result close();
+
+ void closeAsync(ResultCallback callback);
+
+ /*
+ * Pause receiving messages via the messageListener, till resumeMessageListener() is called.
+ */
+ Result pauseMessageListener();
+
+ /*
+ * Resume receiving the messages via the messageListener.
+ * Asynchronously receive all the messages enqueued from time pauseMessageListener() was called.
+ */
+ Result resumeMessageListener();
+
+ /**
+ * Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
+ * active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
+ * the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
+ * breaks, the messages are redelivered after reconnect.
+ */
+ void redeliverUnacknowledgedMessages();
+
+ private:
+ typedef boost::shared_ptr ConsumerImplBasePtr;
+ friend class PulsarFriend;
+ ConsumerImplBasePtr impl_;
+ explicit Consumer(ConsumerImplBasePtr);
+
+ friend class PartitionedConsumerImpl;
+ friend class ConsumerImpl;
+ friend class ClientImpl;
+ friend class ConsumerTest;
+};
+
+}
+
+#pragma GCC visibility pop
+
+#endif /* CONSUMER_HPP_ */
diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h
new file mode 100644
index 0000000000000..9218e2ca7be44
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -0,0 +1,143 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MESSAGE_HPP_
+#define MESSAGE_HPP_
+
+#include