diff --git a/queue_job/__init__.py b/queue_job/__init__.py index 34b2e85788..373fbaedc8 100644 --- a/queue_job/__init__.py +++ b/queue_job/__init__.py @@ -3,3 +3,4 @@ from . import models from . import jobrunner from .hooks.post_init_hook import post_init_hook +from .hooks.uninstall_hook import uninstall_hook diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index d7df69c9b0..c1fb9538e7 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -21,4 +21,5 @@ "development_status": "Mature", "maintainers": ["guewen"], "post_init_hook": "post_init_hook", + "uninstall_hook": "uninstall_hook", } diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index a0814bbbd5..d13eb38764 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -142,3 +142,16 @@ def create_test_job( ) return delayed.db_record().uuid + + @http.route("/queue_job/notify_db_listener", type="http", auth="user") + def notify_db_listener(self, action="add"): + if not http.request.env.user.has_group("base.group_erp_manager"): + raise Forbidden(_("Access Denied")) + + with odoo.sql_db.db_connect("postgres").cursor() as cr_postgres: + cr_postgres.execute( + "notify queue_job_db_listener, %s", + ("{} {}".format(action, http.request.env.cr.dbname),), + ) + + return "" diff --git a/queue_job/hooks/post_init_hook.py b/queue_job/hooks/post_init_hook.py index 1e1a469cdf..1389bf2803 100644 --- a/queue_job/hooks/post_init_hook.py +++ b/queue_job/hooks/post_init_hook.py @@ -1,8 +1,9 @@ -# Copyright 2020 ACSONE SA/NV -# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) import logging +import odoo + logger = logging.getLogger(__name__) @@ -31,3 +32,14 @@ def post_init_hook(cr, registry): FOR EACH ROW EXECUTE PROCEDURE queue_job_notify(); """ ) + + def notify(): + logger.info("Notify jobrunner to add this new db") + with odoo.sql_db.db_connect("postgres").cursor() as cr_postgres: + cr_postgres.execute( + "notify queue_job_db_listener, %s", ("add {}".format(cr.dbname),) + ) + + # notify only when the module installation transaction has actually been committed + # https://github.com/odoo/odoo/blob/aeebe275/odoo/modules/loading.py#L242 + cr.after("commit", notify) diff --git a/queue_job/hooks/uninstall_hook.py b/queue_job/hooks/uninstall_hook.py new file mode 100644 index 0000000000..f7e41e628a --- /dev/null +++ b/queue_job/hooks/uninstall_hook.py @@ -0,0 +1,15 @@ +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +import logging + +import odoo + +logger = logging.getLogger(__name__) + + +def uninstall_hook(cr, registry): + logger.info("Notify jobrunner to remove this db") + with odoo.sql_db.db_connect("postgres").cursor() as cr_postgres: + cr_postgres.execute( + "notify queue_job_db_listener, %s", ("remove {}".format(cr.dbname),) + ) diff --git a/queue_job/jobrunner/dblistener.py b/queue_job/jobrunner/dblistener.py new file mode 100644 index 0000000000..fd1d600b91 --- /dev/null +++ b/queue_job/jobrunner/dblistener.py @@ -0,0 +1,76 @@ +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +import logging +import select +import time +from threading import Thread + +import psycopg2 +from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT + +SELECT_TIMEOUT = 60 +ERROR_RECOVERY_DELAY = 5 + +_logger = logging.getLogger(__name__) + + +class DBListenerThread(Thread): + def __init__(self, jobrunner, connection_info): + Thread.__init__(self) + self.daemon = True + self.listener = DBListener(jobrunner, connection_info) + + def run(self): + self.listener.run() + + def stop(self): + self.listener.stop() + + +class DBListener(object): + def __init__(self, jobrunner, connection_info): + self.jobrunner = jobrunner + self.connection_info = connection_info + + def run(self): + while not self.jobrunner._stop: + try: + _logger.info( + "connecting to db postgres@%(host)s:%(port)s", self.connection_info, + ) + conn = psycopg2.connect(**self.connection_info) + conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + with conn.cursor() as cr: + cr.execute("listen queue_job_db_listener") + conn.commit() + while True: + if select.select([conn], [], [], SELECT_TIMEOUT) == ( + [], + [], + [], + ): + pass + else: + conn.poll() + while conn.notifies: + payload = conn.notifies.pop().payload + _logger.info("received notification: %s", payload) + if payload.startswith("add"): + db_name = payload[4:] + # the module state is changed to installed in + # a different transaction than its installation: + # https://github.com/odoo/odoo/blob/aeebe275 + # /odoo/modules/loading.py#L266 + # so we wait a bit + time.sleep(2) + self.jobrunner.initialize_database(db_name) + elif payload.startswith("remove"): + db_name = payload[7:] + self.jobrunner.close_database( + db_name, remove_jobs=True + ) + except Exception: + _logger.exception( + "exception: sleeping %ds and retrying", ERROR_RECOVERY_DELAY + ) + time.sleep(ERROR_RECOVERY_DELAY) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 8dbe39d97c..8d5224e1a7 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -140,6 +140,7 @@ import threading import time from contextlib import closing, contextmanager +from distutils.util import strtobool import psycopg2 import requests @@ -150,6 +151,7 @@ from . import queue_job_config from .channels import ENQUEUED, NOT_DONE, PENDING, ChannelManager +from .dblistener import DBListenerThread SELECT_TIMEOUT = 60 ERROR_RECOVERY_DELAY = 5 @@ -165,6 +167,10 @@ # so we check it in addition to the environment variables. +def is_true(strval): + return bool(strtobool(strval or "0".lower())) + + def _channels(): return ( os.environ.get("ODOO_QUEUE_JOB_CHANNELS") @@ -396,25 +402,40 @@ def get_db_names(self): db_names = odoo.service.db.exp_list(True) return db_names + def close_database(self, db_name, remove_jobs=True): + if db_name not in self.db_by_name: + _logger.debug("database %s is unknown", db_name) + return + try: + db = self.db_by_name.pop(db_name) + if remove_jobs: + self.channel_manager.remove_db(db_name) + db.close() + _logger.info("closed database %s", db_name) + except Exception: + _logger.warning("error closing database %s", db_name, exc_info=True) + def close_databases(self, remove_jobs=True): - for db_name, db in self.db_by_name.items(): - try: - if remove_jobs: - self.channel_manager.remove_db(db_name) - db.close() - except Exception: - _logger.warning("error closing database %s", db_name, exc_info=True) - self.db_by_name = {} + # we use a copy of keys because dict.keys() returns an iterator + # which would otherwise raise a + # RuntimeError: dictionary changed size during iteration + for db_name in list(self.db_by_name.keys()): + self.close_database(db_name, remove_jobs) + + def initialize_database(self, db_name): + db = Database(db_name) + if not db.has_queue_job: + _logger.debug("queue_job is not installed for db %s", db_name) + else: + self.db_by_name[db_name] = db + with db.select_jobs("state in %s", (NOT_DONE,)) as cr: + for job_data in cr: + self.channel_manager.notify(db_name, *job_data) + _logger.info("queue job runner ready for db %s", db_name) def initialize_databases(self): for db_name in self.get_db_names(): - db = Database(db_name) - if db.has_queue_job: - self.db_by_name[db_name] = db - with db.select_jobs("state in %s", (NOT_DONE,)) as cr: - for job_data in cr: - self.channel_manager.notify(db_name, *job_data) - _logger.info("queue job runner ready for db %s", db_name) + self.initialize_database(db_name) def run_jobs(self): now = _odoo_now() @@ -492,12 +513,17 @@ def stop(self): def run(self): _logger.info("starting") + if is_true( + os.environ.get("ODOO_QUEUE_JOB_JOBRUNNER_ENABLE_DBLISTENER") + ) or queue_job_config.get("jobrunner_enable_dblistener", False): + self.listener_thread = DBListenerThread( + jobrunner=self, connection_info=_connection_info_for("postgres") + ) + self.listener_thread.start() while not self._stop: # outer loop does exception recovery try: _logger.info("initializing database connections") - # TODO: how to detect new databases or databases - # on which queue_job is installed after server start? self.initialize_databases() _logger.info("database connections ready") # inner loop does the normal processing