Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions queue_job/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from . import models
from . import jobrunner
from .hooks.post_init_hook import post_init_hook
from .hooks.uninstall_hook import uninstall_hook
1 change: 1 addition & 0 deletions queue_job/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@
"development_status": "Mature",
"maintainers": ["guewen"],
"post_init_hook": "post_init_hook",
"uninstall_hook": "uninstall_hook",
}
13 changes: 13 additions & 0 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,16 @@ def create_test_job(
)

return delayed.db_record().uuid

@http.route("/queue_job/notify_db_listener", type="http", auth="user")
def notify_db_listener(self, action="add"):
if not http.request.env.user.has_group("base.group_erp_manager"):
raise Forbidden(_("Access Denied"))

with odoo.sql_db.db_connect("postgres").cursor() as cr_postgres:
cr_postgres.execute(
"notify queue_job_db_listener, %s",
("{} {}".format(action, http.request.env.cr.dbname),),
)

return ""
16 changes: 14 additions & 2 deletions queue_job/hooks/post_init_hook.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Copyright 2020 ACSONE SA/NV
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)

import logging

import odoo

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -31,3 +32,14 @@ def post_init_hook(cr, registry):
FOR EACH ROW EXECUTE PROCEDURE queue_job_notify();
"""
)

def notify():
logger.info("Notify jobrunner to add this new db")
with odoo.sql_db.db_connect("postgres").cursor() as cr_postgres:
cr_postgres.execute(
"notify queue_job_db_listener, %s", ("add {}".format(cr.dbname),)
)

# notify only when the module installation transaction has actually been committed
# https://github.com/odoo/odoo/blob/aeebe275/odoo/modules/loading.py#L242
cr.after("commit", notify)
15 changes: 15 additions & 0 deletions queue_job/hooks/uninstall_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)

import logging

import odoo

logger = logging.getLogger(__name__)


def uninstall_hook(cr, registry):
logger.info("Notify jobrunner to remove this db")
with odoo.sql_db.db_connect("postgres").cursor() as cr_postgres:
cr_postgres.execute(
"notify queue_job_db_listener, %s", ("remove {}".format(cr.dbname),)
)
76 changes: 76 additions & 0 deletions queue_job/jobrunner/dblistener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)

import logging
import select
import time
from threading import Thread

import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

SELECT_TIMEOUT = 60
ERROR_RECOVERY_DELAY = 5

_logger = logging.getLogger(__name__)


class DBListenerThread(Thread):
def __init__(self, jobrunner, connection_info):
Thread.__init__(self)
self.daemon = True
self.listener = DBListener(jobrunner, connection_info)

def run(self):
self.listener.run()

def stop(self):
self.listener.stop()


class DBListener(object):
def __init__(self, jobrunner, connection_info):
self.jobrunner = jobrunner
self.connection_info = connection_info

def run(self):
while not self.jobrunner._stop:
try:
_logger.info(
"connecting to db postgres@%(host)s:%(port)s", self.connection_info,
)
conn = psycopg2.connect(**self.connection_info)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with conn.cursor() as cr:
cr.execute("listen queue_job_db_listener")
conn.commit()
while True:
if select.select([conn], [], [], SELECT_TIMEOUT) == (
[],
[],
[],
):
pass
else:
conn.poll()
while conn.notifies:
payload = conn.notifies.pop().payload
_logger.info("received notification: %s", payload)
if payload.startswith("add"):
db_name = payload[4:]
# the module state is changed to installed in
# a different transaction than its installation:
# https://github.com/odoo/odoo/blob/aeebe275
# /odoo/modules/loading.py#L266
# so we wait a bit
time.sleep(2)
self.jobrunner.initialize_database(db_name)
elif payload.startswith("remove"):
db_name = payload[7:]
self.jobrunner.close_database(
db_name, remove_jobs=True
)
except Exception:
_logger.exception(
"exception: sleeping %ds and retrying", ERROR_RECOVERY_DELAY
)
time.sleep(ERROR_RECOVERY_DELAY)
60 changes: 43 additions & 17 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
import threading
import time
from contextlib import closing, contextmanager
from distutils.util import strtobool

import psycopg2
import requests
Expand All @@ -150,6 +151,7 @@

from . import queue_job_config
from .channels import ENQUEUED, NOT_DONE, PENDING, ChannelManager
from .dblistener import DBListenerThread

SELECT_TIMEOUT = 60
ERROR_RECOVERY_DELAY = 5
Expand All @@ -165,6 +167,10 @@
# so we check it in addition to the environment variables.


def is_true(strval):
return bool(strtobool(strval or "0".lower()))


def _channels():
return (
os.environ.get("ODOO_QUEUE_JOB_CHANNELS")
Expand Down Expand Up @@ -396,25 +402,40 @@ def get_db_names(self):
db_names = odoo.service.db.exp_list(True)
return db_names

def close_database(self, db_name, remove_jobs=True):
if db_name not in self.db_by_name:
_logger.debug("database %s is unknown", db_name)
return
try:
db = self.db_by_name.pop(db_name)
if remove_jobs:
self.channel_manager.remove_db(db_name)
db.close()
_logger.info("closed database %s", db_name)
except Exception:
_logger.warning("error closing database %s", db_name, exc_info=True)

def close_databases(self, remove_jobs=True):
for db_name, db in self.db_by_name.items():
try:
if remove_jobs:
self.channel_manager.remove_db(db_name)
db.close()
except Exception:
_logger.warning("error closing database %s", db_name, exc_info=True)
self.db_by_name = {}
# we use a copy of keys because dict.keys() returns an iterator
# which would otherwise raise a
# RuntimeError: dictionary changed size during iteration
for db_name in list(self.db_by_name.keys()):
self.close_database(db_name, remove_jobs)

def initialize_database(self, db_name):
db = Database(db_name)
if not db.has_queue_job:
_logger.debug("queue_job is not installed for db %s", db_name)
else:
self.db_by_name[db_name] = db
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
for job_data in cr:
self.channel_manager.notify(db_name, *job_data)
_logger.info("queue job runner ready for db %s", db_name)

def initialize_databases(self):
for db_name in self.get_db_names():
db = Database(db_name)
if db.has_queue_job:
self.db_by_name[db_name] = db
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
for job_data in cr:
self.channel_manager.notify(db_name, *job_data)
_logger.info("queue job runner ready for db %s", db_name)
self.initialize_database(db_name)

def run_jobs(self):
now = _odoo_now()
Expand Down Expand Up @@ -492,12 +513,17 @@ def stop(self):

def run(self):
_logger.info("starting")
if is_true(
os.environ.get("ODOO_QUEUE_JOB_JOBRUNNER_ENABLE_DBLISTENER")
) or queue_job_config.get("jobrunner_enable_dblistener", False):
self.listener_thread = DBListenerThread(
jobrunner=self, connection_info=_connection_info_for("postgres")
)
self.listener_thread.start()
while not self._stop:
# outer loop does exception recovery
try:
_logger.info("initializing database connections")
# TODO: how to detect new databases or databases
# on which queue_job is installed after server start?
self.initialize_databases()
_logger.info("database connections ready")
# inner loop does the normal processing
Expand Down