Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def send_callback(res, msg_id):
client.close()
"""

import logging
import _pulsar

from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType # noqa: F401
Expand Down Expand Up @@ -362,6 +363,7 @@ def __init__(self, service_url,
tls_trust_certs_file_path=None,
tls_allow_insecure_connection=False,
tls_validate_hostname=False,
logger=None
):
"""
Create a new Pulsar client instance.
Expand Down Expand Up @@ -405,6 +407,8 @@ def __init__(self, service_url,
Configure whether the Pulsar client validates that the hostname of the
endpoint, matches the common name on the TLS certificate presented by
the endpoint.
* `logger`:
Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`.
"""
_check_type(str, service_url, 'service_url')
_check_type_or_none(Authentication, authentication, 'authentication')
Expand All @@ -417,6 +421,7 @@ def __init__(self, service_url,
_check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
_check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
_check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
_check_type_or_none(logging.Logger, logger, 'logger')

conf = _pulsar.ClientConfiguration()
if authentication:
Expand All @@ -427,6 +432,8 @@ def __init__(self, service_url,
conf.concurrent_lookup_requests(concurrent_lookup_requests)
if log_conf_file_path:
conf.log_conf_file_path(log_conf_file_path)
if logger:
conf.set_logger(logger)
if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
conf.use_tls(True)
if tls_trust_certs_file_path:
Expand Down
5 changes: 5 additions & 0 deletions pulsar-client-cpp/python/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#


import logging
from unittest import TestCase, main
import time
import os
Expand Down Expand Up @@ -98,6 +99,10 @@ def test_consumer_config(self):
conf.consumer_name("my-name")
self.assertEqual(conf.consumer_name(), "my-name")

def test_client_logger(self):
logger = logging.getLogger("pulsar")
Client(self.serviceUrl, logger=logger)

def test_connect_error(self):
with self.assertRaises(pulsar.ConnectError):
client = Client('fakeServiceUrl')
Expand Down
101 changes: 101 additions & 0 deletions pulsar-client-cpp/python/src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,106 @@ static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerC
return conf;
}

class LoggerWrapper: public Logger {
PyObject* _pyLogger;
int _currentPythonLogLevel = _getLogLevelValue(Logger::LEVEL_INFO);

void _updateCurrentPythonLogLevel() {
PyGILState_STATE state = PyGILState_Ensure();

try {
_currentPythonLogLevel = py::call_method<int>(_pyLogger, "getEffectiveLevel");
} catch (py::error_already_set e) {
PyErr_Print();
}

PyGILState_Release(state);
};

int _getLogLevelValue(Level level) {
return 10 + (level * 10);
}

public:

LoggerWrapper(const std::string &logger, PyObject* pyLogger) {
_pyLogger = pyLogger;
Py_XINCREF(_pyLogger);

_updateCurrentPythonLogLevel();
}

LoggerWrapper(const LoggerWrapper& other) {
_pyLogger = other._pyLogger;
Py_XINCREF(_pyLogger);
}

LoggerWrapper& operator=(const LoggerWrapper& other) {
_pyLogger = other._pyLogger;
Py_XINCREF(_pyLogger);
return *this;
}

virtual ~LoggerWrapper() {
Py_XDECREF(_pyLogger);
}

bool isEnabled(Level level) {
return _getLogLevelValue(level) >= _currentPythonLogLevel;
}

void log(Level level, int line, const std::string& message) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file and line information is not logged, but I'm not familiar with the code enought to decide what is more appropriate. From the point of developer I would preffer the file and line information in the log message.

Probably someone more familiar with this code should speek his opinion.

Otherwise looks good.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to show example and you incentive to help with decission.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not having this info only truly matters if the Python Logger is connected ultimately to one or more Formatters that has tokens like filename, pathname or lineno in its format string.

It should be possible to create a LogRecord object (https://docs.python.org/3/library/logging.html#logrecord-objects) and pass it to Logger.handle method instead of using the simpler info, warning etc methods. I for one think it is not worth the trouble and the implementation is fine as-is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Bklyn, please consider approving the PR

PyGILState_STATE state = PyGILState_Ensure();

try {
switch (level) {
case Logger::LEVEL_DEBUG:
py::call_method<void>(_pyLogger, "debug", message.c_str());
break;
case Logger::LEVEL_INFO:
py::call_method<void>(_pyLogger, "info", message.c_str());
break;
case Logger::LEVEL_WARN:
py::call_method<void>(_pyLogger, "warning", message.c_str());
break;
case Logger::LEVEL_ERROR:
py::call_method<void>(_pyLogger, "error", message.c_str());
break;
}

} catch (py::error_already_set e) {
PyErr_Print();
}

PyGILState_Release(state);
}
};

class LoggerWrapperFactory : public LoggerFactory {
static LoggerWrapperFactory* _instance;
PyObject* _pyLogger;

public:
LoggerWrapperFactory(py::object pyLogger) {
_pyLogger = pyLogger.ptr();
Py_XINCREF(_pyLogger);
}

virtual ~LoggerWrapperFactory() {
Py_XDECREF(_pyLogger);
}

Logger* getLogger(const std::string &fileName) {
return new LoggerWrapper(fileName, _pyLogger);
}
};

static ClientConfiguration& ClientConfiguration_setLogger(ClientConfiguration& conf, py::object logger) {
conf.setLogger(new LoggerWrapperFactory(logger));
return conf;
}


void export_config() {
using namespace boost::python;

Expand All @@ -110,6 +210,7 @@ void export_config() {
.def("tls_allow_insecure_connection", &ClientConfiguration::isTlsAllowInsecureConnection)
.def("tls_allow_insecure_connection", &ClientConfiguration::setTlsAllowInsecureConnection, return_self<>())
.def("tls_validate_hostname", &ClientConfiguration::setValidateHostName, return_self<>())
.def("set_logger", &ClientConfiguration_setLogger, return_self<>())
;

class_<ProducerConfiguration>("ProducerConfiguration")
Expand Down