From 8e086327bbc969b9a272732e162e0331499e0e91 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Wed, 30 Sep 2020 17:05:24 +0200 Subject: [PATCH 1/4] Support multi-nodes with lock on jobrunner Starting several odoo (main) processes with "--load=web,queue_job" was unsupported, as it would start several job runner, which would all listen to postgresql notifications and try to enqueue jobs in concurrent workers. This is an issue in several cases: * it causes issues on odoo.sh that uses an hybrid model for workers and starts several job runners [0] * it defeats any setup that would use several nodes to keep the service available in case of failure of a node/host The solution implemented here is using a PostgreSQL advisory lock, at session level in a connection on the "postgres" database, which ensure 2 job runners are not working on the same set of databases. At loading, the job runner tries to acquire the lock. If it can, it initializes the connection and listen for jobs. If the lock is taken by another job runner, it waits and retry to acquire it every 30 seconds. Example when a job runner is started and another one starts: INFO ? odoo.addons.queue_job.jobrunner.runner: starting INFO ? odoo.addons.queue_job.jobrunner.runner: already started on another node The shared lock identifier is computed based on the set of databases the job runner has to listen to: if a job runner is started with ``--database=queue1`` and another with ``--database=queue2``, they will have different locks and such will be able to work in parallel. Important: new databases need a restart of the job runner. This was already the case, and would be a great improvement, but is out of scope for this improvement. [0] https://github.com/OCA/queue/issues/169#issuecomment-693407232 --- queue_job/jobrunner/runner.py | 135 ++++++++++++++++++++++++++++++++-- 1 file changed, 130 insertions(+), 5 deletions(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 8dbe39d97c..b88a47f62d 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -18,6 +18,35 @@ * It does not run jobs itself, but asks Odoo to run them through an anonymous ``/queue_job/runjob`` HTTP request. [1]_ +How does concurrent job runners work? +------------------------------------- + +If several nodes (on different hosts or not) of job runners are started, +a shared lock ensures that only one job runner works on a database at +a time. These rules are to take in consideration: + +* The identifier of the shared lock is based on the database list provided, + so either ``--database``/``db_name`` or all the databases in PostgreSQL. +* When 2 job runners with the exact same list of databases are started, + only the first one will work. The second one will wait and take over + if the first one is stopped. + +Caveats: + +* If 2 job runners have a database in common but a different list (e.g. + ``db_name=project1,project2`` and ``db_name=project2,project3``), both job + runners will work and listen to ``project2``, which will lead to unexpected + behavior. +* The same applies when no database is specified and all the cluster's databases + are used. If a job runner is started on the cluster's databases, a new database + is created and a second job runner is started, they'll both work on a same set + of databases with unexpected behaviors. +* PostgreSQL advisory locks are based on a integer, the list of database names + is sorted, hashed and converted to an int64, so we lose information in the + identifier. A low risk of collision is possible. If it happens some day, we + should add an option for a custom lock identifier. + + How to use it? -------------- @@ -134,6 +163,7 @@ """ import datetime +import hashlib import logging import os import select @@ -152,6 +182,8 @@ from .channels import ENQUEUED, NOT_DONE, PENDING, ChannelManager SELECT_TIMEOUT = 60 +TRY_ACQUIRE_INTERVAL = 30 # seconds +SHARED_LOCK_KEEP_ALIVE = 60 # seconds ERROR_RECOVERY_DELAY = 5 _logger = logging.getLogger(__name__) @@ -251,7 +283,52 @@ def urlopen(): thread.start() -class Database(object): +class SharedLockDatabase(object): + def __init__(self, db_name, lock_name): + self.db_name = db_name + self.lock_ident = self.name_to_int64(lock_name) + connection_info = _connection_info_for(db_name) + self.conn = psycopg2.connect(**connection_info) + self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + self.acquired = False + self.try_acquire() + + @staticmethod + def name_to_int64(lock_name): + hasher = hashlib.sha256() + hasher.update(lock_name.encode("utf-8")) + # pg_try_advisory_lock is limited to an 8-byte (64bit) signed integer + return int.from_bytes(hasher.digest()[:8], byteorder="big", signed=True) + + def try_acquire(self): + self.acquired = self._acquire() + + def _acquire(self): + with closing(self.conn.cursor()) as cr: + cr.execute("SET idle_in_transaction_session_timeout = 60000;") + # session level lock + cr.execute("SELECT pg_try_advisory_lock(%s);", (self.lock_ident,)) + acquired = cr.fetchone()[0] + return acquired + + def keep_alive(self): + query = "SELECT 1" + with closing(self.conn.cursor()) as cr: + cr.execute(query) + + def close(self): + # pylint: disable=except-pass + # if close fail for any reason, it's either because it's already closed + # and we don't care, or for any reason but anyway it will be closed on + # del + try: + self.conn.close() + except Exception: + pass + self.conn = None + + +class QueueDatabase(object): def __init__(self, db_name): self.db_name = db_name connection_info = _connection_info_for(db_name) @@ -355,6 +432,11 @@ def __init__( if channel_config_string is None: channel_config_string = _channels() self.channel_manager.simple_configure(channel_config_string) + + self.shared_lock_db = None + # TODO: how to detect new databases or databases + # on which queue_job is installed after server start? + self.list_db_names = self.get_db_names() self.db_by_name = {} self._stop = False self._stop_pipe = os.pipe() @@ -404,11 +486,22 @@ def close_databases(self, remove_jobs=True): db.close() except Exception: _logger.warning("error closing database %s", db_name, exc_info=True) + self.db_by_name = {} + if self.shared_lock_db: + try: + self.shared_lock_db.close() + except Exception: + _logger.warning( + "error closing database %s", + self.shared_lock_db.db_name, + exc_info=True, + ) + def initialize_databases(self): - for db_name in self.get_db_names(): - db = Database(db_name) + for db_name in self.list_db_names: + db = QueueDatabase(db_name) if db.has_queue_job: self.db_by_name[db_name] = db with db.select_jobs("state in %s", (NOT_DONE,)) as cr: @@ -484,6 +577,12 @@ def wait_notification(self): for conn in conns: conn.poll() + def keep_alive_shared_lock(self): + self.shared_lock_db.keep_alive() + + def _lock_ident(self): + return "qj:{}".format("-".join(sorted(self.list_db_names))) + def stop(self): _logger.info("graceful stop requested") self._stop = True @@ -495,16 +594,42 @@ def run(self): while not self._stop: # outer loop does exception recovery try: + # When concurrent jobrunners are started, the first to win the + # race acquires an advisory lock on PostgreSQL and gets to + # work. When a jobrunner is stopped, the lock is released, and + # another node can take over. + self.shared_lock_db = SharedLockDatabase("postgres", self._lock_ident()) + if not self.shared_lock_db.acquired: + self.close_databases() + _logger.info("already started on another node") + # no database to work with... retry later in case a concurrent + # node is stopped + time.sleep(TRY_ACQUIRE_INTERVAL) + continue + _logger.info("initializing database connections") - # TODO: how to detect new databases or databases - # on which queue_job is installed after server start? self.initialize_databases() _logger.info("database connections ready") + + last_keep_alive = None + # inner loop does the normal processing while not self._stop: self.process_notifications() self.run_jobs() self.wait_notification() + if ( + not last_keep_alive + or time.time() >= last_keep_alive + SHARED_LOCK_KEEP_ALIVE + ): + last_keep_alive = time.time() + # send a keepalive on the shared lock connection at + # most every 60 seconds + self.keep_alive_shared_lock() + # TODO here, when we have no "db_name", we could list again + # the databases and if the list changed, try to acquire a new + # lock + except KeyboardInterrupt: self.stop() except InterruptedError: From fda7b195219f0d75b1ddabf5e594eaafab719e0f Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Wed, 30 Sep 2020 17:05:24 +0200 Subject: [PATCH 2/4] Support multi-nodes with lock on jobrunner Starting several odoo (main) processes with "--load=web,queue_job" was unsupported, as it would start several job runner, which would all listen to postgresql notifications and try to enqueue jobs in concurrent workers. This is an issue in several cases: * it causes issues on odoo.sh that uses an hybrid model for workers and starts several job runners [0] * it defeats any setup that would use several nodes to keep the service available in case of failure of a node/host The solution implemented here is using a PostgreSQL advisory lock, at session level in a connection on the "postgres" database, which ensure 2 job runners are not working on the same set of databases. At loading, the job runner tries to acquire the lock. If it can, it initializes the connection and listen for jobs. If the lock is taken by another job runner, it waits and retry to acquire it every 30 seconds. Example when a job runner is started and another one starts: INFO ? odoo.addons.queue_job.jobrunner.runner: starting INFO ? odoo.addons.queue_job.jobrunner.runner: already started on another node The shared lock identifier is computed based on the set of databases the job runner has to listen to: if a job runner is started with ``--database=queue1`` and another with ``--database=queue2``, they will have different locks and such will be able to work in parallel. Important: new databases need a restart of the job runner. This was already the case, and would be a great improvement, but is out of scope for this improvement. [0] https://github.com/OCA/queue/issues/169#issuecomment-693407232 From f599f8f6af0329df4b41aa8d62559d9643bb9cd3 Mon Sep 17 00:00:00 2001 From: Nils Hamerlinck Date: Mon, 5 Apr 2021 11:17:41 +0700 Subject: [PATCH 3/4] [FIX] idle_in_transaction_session_timeout --- queue_job/jobrunner/runner.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index b88a47f62d..1e5dcbe3e8 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -289,8 +289,8 @@ def __init__(self, db_name, lock_name): self.lock_ident = self.name_to_int64(lock_name) connection_info = _connection_info_for(db_name) self.conn = psycopg2.connect(**connection_info) - self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) self.acquired = False + self._keep_alive_cursor = None self.try_acquire() @staticmethod @@ -302,10 +302,22 @@ def name_to_int64(lock_name): def try_acquire(self): self.acquired = self._acquire() + if self.acquired: + # we open a transaction that we will never commit; + # at most every SHARED_LOCK_KEEP_ALIVE seconds we will + # keep it alive with a simple SELECT 1 query; + # if the process crashes or if the connection is cut, + # the pg server will terminate self.conn after + # 2*SHARED_LOCK_KEEP_ALIVE seconds, which will + # free the advisory lock and let another worker take over + self._keep_alive_cursor = self.conn.cursor() + self._keep_alive_cursor.execute( + "SET idle_in_transaction_session_timeout = %s;", + (SHARED_LOCK_KEEP_ALIVE * 1000 * 2,), + ) def _acquire(self): with closing(self.conn.cursor()) as cr: - cr.execute("SET idle_in_transaction_session_timeout = 60000;") # session level lock cr.execute("SELECT pg_try_advisory_lock(%s);", (self.lock_ident,)) acquired = cr.fetchone()[0] @@ -313,8 +325,7 @@ def _acquire(self): def keep_alive(self): query = "SELECT 1" - with closing(self.conn.cursor()) as cr: - cr.execute(query) + self._keep_alive_cursor.execute(query) def close(self): # pylint: disable=except-pass @@ -624,7 +635,7 @@ def run(self): ): last_keep_alive = time.time() # send a keepalive on the shared lock connection at - # most every 60 seconds + # most every SHARED_LOCK_KEEP_ALIVE seconds self.keep_alive_shared_lock() # TODO here, when we have no "db_name", we could list again # the databases and if the list changed, try to acquire a new From 6569fb082ece23f9bead000d2aa7bbaf2b48b598 Mon Sep 17 00:00:00 2001 From: Nils Hamerlinck Date: Mon, 5 Apr 2021 11:17:41 +0700 Subject: [PATCH 4/4] [FIX] idle_in_transaction_session_timeout