diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 3f005543f5366..ad1a07050ff76 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -99,6 +99,7 @@ def send_callback(res, msg_id): client.close() """ +import logging import _pulsar from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType # noqa: F401 @@ -362,6 +363,7 @@ def __init__(self, service_url, tls_trust_certs_file_path=None, tls_allow_insecure_connection=False, tls_validate_hostname=False, + logger=None ): """ Create a new Pulsar client instance. @@ -405,6 +407,8 @@ def __init__(self, service_url, Configure whether the Pulsar client validates that the hostname of the endpoint, matches the common name on the TLS certificate presented by the endpoint. + * `logger`: + Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`. """ _check_type(str, service_url, 'service_url') _check_type_or_none(Authentication, authentication, 'authentication') @@ -417,6 +421,7 @@ def __init__(self, service_url, _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path') _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection') _check_type(bool, tls_validate_hostname, 'tls_validate_hostname') + _check_type_or_none(logging.Logger, logger, 'logger') conf = _pulsar.ClientConfiguration() if authentication: @@ -427,6 +432,8 @@ def __init__(self, service_url, conf.concurrent_lookup_requests(concurrent_lookup_requests) if log_conf_file_path: conf.log_conf_file_path(log_conf_file_path) + if logger: + conf.set_logger(logger) if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'): conf.use_tls(True) if tls_trust_certs_file_path: diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index 71b67d75af4bc..3d76d50a29b65 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -19,6 +19,7 @@ # +import logging from unittest import TestCase, main import time import os @@ -98,6 +99,10 @@ def test_consumer_config(self): conf.consumer_name("my-name") self.assertEqual(conf.consumer_name(), "my-name") + def test_client_logger(self): + logger = logging.getLogger("pulsar") + Client(self.serviceUrl, logger=logger) + def test_connect_error(self): with self.assertRaises(pulsar.ConnectError): client = Client('fakeServiceUrl') diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index 188aaf546d176..c21b1d3b4203b 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -88,6 +88,106 @@ static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerC return conf; } +class LoggerWrapper: public Logger { + PyObject* _pyLogger; + int _currentPythonLogLevel = _getLogLevelValue(Logger::LEVEL_INFO); + + void _updateCurrentPythonLogLevel() { + PyGILState_STATE state = PyGILState_Ensure(); + + try { + _currentPythonLogLevel = py::call_method(_pyLogger, "getEffectiveLevel"); + } catch (py::error_already_set e) { + PyErr_Print(); + } + + PyGILState_Release(state); + }; + + int _getLogLevelValue(Level level) { + return 10 + (level * 10); + } + + public: + + LoggerWrapper(const std::string &logger, PyObject* pyLogger) { + _pyLogger = pyLogger; + Py_XINCREF(_pyLogger); + + _updateCurrentPythonLogLevel(); + } + + LoggerWrapper(const LoggerWrapper& other) { + _pyLogger = other._pyLogger; + Py_XINCREF(_pyLogger); + } + + LoggerWrapper& operator=(const LoggerWrapper& other) { + _pyLogger = other._pyLogger; + Py_XINCREF(_pyLogger); + return *this; + } + + virtual ~LoggerWrapper() { + Py_XDECREF(_pyLogger); + } + + bool isEnabled(Level level) { + return _getLogLevelValue(level) >= _currentPythonLogLevel; + } + + void log(Level level, int line, const std::string& message) { + PyGILState_STATE state = PyGILState_Ensure(); + + try { + switch (level) { + case Logger::LEVEL_DEBUG: + py::call_method(_pyLogger, "debug", message.c_str()); + break; + case Logger::LEVEL_INFO: + py::call_method(_pyLogger, "info", message.c_str()); + break; + case Logger::LEVEL_WARN: + py::call_method(_pyLogger, "warning", message.c_str()); + break; + case Logger::LEVEL_ERROR: + py::call_method(_pyLogger, "error", message.c_str()); + break; + } + + } catch (py::error_already_set e) { + PyErr_Print(); + } + + PyGILState_Release(state); + } +}; + +class LoggerWrapperFactory : public LoggerFactory { + static LoggerWrapperFactory* _instance; + PyObject* _pyLogger; + + public: + LoggerWrapperFactory(py::object pyLogger) { + _pyLogger = pyLogger.ptr(); + Py_XINCREF(_pyLogger); + } + + virtual ~LoggerWrapperFactory() { + Py_XDECREF(_pyLogger); + } + + Logger* getLogger(const std::string &fileName) { + return new LoggerWrapper(fileName, _pyLogger); + } +}; + +static ClientConfiguration& ClientConfiguration_setLogger(ClientConfiguration& conf, py::object logger) { + conf.setLogger(new LoggerWrapperFactory(logger)); + return conf; +} + + void export_config() { using namespace boost::python; @@ -110,6 +210,7 @@ void export_config() { .def("tls_allow_insecure_connection", &ClientConfiguration::isTlsAllowInsecureConnection) .def("tls_allow_insecure_connection", &ClientConfiguration::setTlsAllowInsecureConnection, return_self<>()) .def("tls_validate_hostname", &ClientConfiguration::setValidateHostName, return_self<>()) + .def("set_logger", &ClientConfiguration_setLogger, return_self<>()) ; class_("ProducerConfiguration")