Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def send_callback(res, msg):

client.close()
"""
import logging

import _pulsar

Expand Down Expand Up @@ -335,6 +336,7 @@ def __init__(self, service_url,
message_listener_threads=1,
concurrent_lookup_requests=50000,
log_conf_file_path=None,
logger=None,
use_tls=False,
tls_trust_certs_file_path=None,
tls_allow_insecure_connection=False,
Expand Down Expand Up @@ -368,6 +370,8 @@ def __init__(self, service_url,
to prevent overload on the broker.
* `log_conf_file_path`:
Initialize log4cxx from a configuration file.
* `logger`:
Set a Python logger for this Pulsar client.
* `use_tls`:
Configure whether to use TLS encryption on the connection. This setting
is deprecated. TLS will be automatically enabled if the `serviceUrl` is
Expand All @@ -390,6 +394,7 @@ def __init__(self, service_url,
_check_type(int, message_listener_threads, 'message_listener_threads')
_check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
_check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
_check_type_or_none(logging.Logger, logger, 'logger')
_check_type(bool, use_tls, 'use_tls')
_check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
_check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
Expand All @@ -404,6 +409,8 @@ def __init__(self, service_url,
conf.concurrent_lookup_requests(concurrent_lookup_requests)
if log_conf_file_path:
conf.log_conf_file_path(log_conf_file_path)
if logger:
conf.set_logger(logger)
if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
conf.use_tls(True)
if tls_trust_certs_file_path:
Expand Down
103 changes: 103 additions & 0 deletions pulsar-client-cpp/python/src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,102 @@
*/
#include "utils.h"

class LoggerWrapper: public Logger {
std::string _logger;
PyObject* _pyLogger;
int _currentPythonLogLevel = 10 + (Logger::LEVEL_INFO*10);

void _updateCurrentPythonLogLevel() {
PyGILState_STATE state = PyGILState_Ensure();

try {
_currentPythonLogLevel = py::call_method<int>(_pyLogger, "getEffectiveLevel");
} catch (py::error_already_set e) {
PyErr_Print();
}

PyGILState_Release(state);
};

public:

LoggerWrapper(const std::string &logger, PyObject* pyLogger) : _logger(logger) {
_pyLogger = pyLogger;
Py_XINCREF(_pyLogger);

_updateCurrentPythonLogLevel();
}

LoggerWrapper(const LoggerWrapper& other) {
_pyLogger = other._pyLogger;
Py_XINCREF(_pyLogger);
}

LoggerWrapper& operator=(const LoggerWrapper& other) {
_pyLogger = other._pyLogger;
Py_XINCREF(_pyLogger);
return *this;
}

virtual ~LoggerWrapper() {
Py_XDECREF(_pyLogger);
}

bool isEnabled(Level level) {
return 10 + (level*10) >= _currentPythonLogLevel;
}

void log(Level level, int line, const std::string& message) {
PyGILState_STATE state = PyGILState_Ensure();

try {
switch (level) {
case Logger::LEVEL_DEBUG:
py::call_method<void>(_pyLogger, "debug", message.c_str());
break;
case Logger::LEVEL_INFO:
py::call_method<void>(_pyLogger, "info", message.c_str());
break;
case Logger::LEVEL_WARN:
py::call_method<void>(_pyLogger, "warning", message.c_str());
break;
case Logger::LEVEL_ERROR:
py::call_method<void>(_pyLogger, "error", message.c_str());
break;
}

} catch (py::error_already_set e) {
PyErr_Print();
}

PyGILState_Release(state);
}
};

class LoggerWrapperFactory : public LoggerFactory {
static LoggerWrapperFactory* _instance;
PyObject* _pyLogger;

LoggerWrapperFactory(py::object pyLogger) {
_pyLogger = pyLogger.ptr();
Py_XINCREF(_pyLogger);
}

public:
virtual ~LoggerWrapperFactory() {
Py_XDECREF(_pyLogger);
}

Logger* getLogger(const std::string &fileName) {
return new LoggerWrapper(fileName, _pyLogger);
}

static LoggerFactoryPtr create(py::object pyLogger) {
return LoggerFactoryPtr(new LoggerWrapperFactory(pyLogger));
}
};


template<typename T>
struct ListenerWrapper {
PyObject* _pyListener;
Expand Down Expand Up @@ -74,6 +170,12 @@ static ClientConfiguration& ClientConfiguration_setAuthentication(ClientConfigur
return conf;
}

static ClientConfiguration& ClientConfiguration_setLogger(ClientConfiguration& conf,
py::object logger) {
conf.setLogger(LoggerWrapperFactory::create(logger));
return conf;
}

void export_config() {
using namespace boost::python;

Expand All @@ -89,6 +191,7 @@ void export_config() {
.def("concurrent_lookup_requests", &ClientConfiguration::setConcurrentLookupRequest, return_self<>())
.def("log_conf_file_path", &ClientConfiguration::getLogConfFilePath, return_value_policy<copy_const_reference>())
.def("log_conf_file_path", &ClientConfiguration::setLogConfFilePath, return_self<>())
.def("set_logger", &ClientConfiguration_setLogger, return_self<>())
.def("use_tls", &ClientConfiguration::isUseTls)
.def("use_tls", &ClientConfiguration::setUseTls, return_self<>())
.def("tls_trust_certs_file_path", &ClientConfiguration::getTlsTrustCertsFilePath)
Expand Down