159159
160160SELECT_TIMEOUT = 60
161161ERROR_RECOVERY_DELAY = 5
162+ PG_ADVISORY_LOCK_ID = 2293787760715711918
162163
163164_logger = logging .getLogger (__name__ )
164165
165166select = selectors .DefaultSelector
166167
167168
169+ class MasterElectionLost (Exception ):
170+ pass
171+
172+
168173# Unfortunately, it is not possible to extend the Odoo
169174# server command line arguments, so we resort to environment variables
170175# to configure the runner (channels mostly).
@@ -262,10 +267,15 @@ def __init__(self, db_name):
262267 self .db_name = db_name
263268 connection_info = _connection_info_for (db_name )
264269 self .conn = psycopg2 .connect (** connection_info )
265- self .conn .set_isolation_level (ISOLATION_LEVEL_AUTOCOMMIT )
266- self .has_queue_job = self ._has_queue_job ()
267- if self .has_queue_job :
268- self ._initialize ()
270+ try :
271+ self .conn .set_isolation_level (ISOLATION_LEVEL_AUTOCOMMIT )
272+ self .has_queue_job = self ._has_queue_job ()
273+ if self .has_queue_job :
274+ self ._acquire_master_lock ()
275+ self ._initialize ()
276+ except BaseException :
277+ self .close ()
278+ raise
269279
270280 def close (self ):
271281 # pylint: disable=except-pass
@@ -278,6 +288,14 @@ def close(self):
278288 pass
279289 self .conn = None
280290
291+ def _acquire_master_lock (self ):
292+ """Acquire the master runner lock or raise MasterElectionLost"""
293+ with closing (self .conn .cursor ()) as cr :
294+ cr .execute ("SELECT pg_try_advisory_lock(%s)" , (PG_ADVISORY_LOCK_ID ,))
295+ if not cr .fetchone ()[0 ]:
296+ msg = f"could not acquire master runner lock on { self .db_name } "
297+ raise MasterElectionLost (msg )
298+
281299 def _has_queue_job (self ):
282300 with closing (self .conn .cursor ()) as cr :
283301 cr .execute (
@@ -413,14 +431,17 @@ def close_databases(self, remove_jobs=True):
413431 self .db_by_name = {}
414432
415433 def initialize_databases (self ):
416- for db_name in self .get_db_names ():
434+ for db_name in sorted (self .get_db_names ()):
435+ # sorting is important to avoid deadlocks in acquiring the master lock
417436 db = Database (db_name )
418437 if db .has_queue_job :
419438 self .db_by_name [db_name ] = db
420439 with db .select_jobs ("state in %s" , (NOT_DONE ,)) as cr :
421440 for job_data in cr :
422441 self .channel_manager .notify (db_name , * job_data )
423442 _logger .info ("queue job runner ready for db %s" , db_name )
443+ else :
444+ db .close ()
424445
425446 def run_jobs (self ):
426447 now = _odoo_now ()
@@ -507,7 +528,7 @@ def run(self):
507528 while not self ._stop :
508529 # outer loop does exception recovery
509530 try :
510- _logger .info ("initializing database connections" )
531+ _logger .debug ("initializing database connections" )
511532 # TODO: how to detect new databases or databases
512533 # on which queue_job is installed after server start?
513534 self .initialize_databases ()
@@ -522,6 +543,14 @@ def run(self):
522543 except InterruptedError :
523544 # Interrupted system call, i.e. KeyboardInterrupt during select
524545 self .stop ()
546+ except MasterElectionLost as e :
547+ _logger .debug (
548+ "master election lost: %s, sleeping %ds and retrying" ,
549+ e ,
550+ ERROR_RECOVERY_DELAY ,
551+ )
552+ self .close_databases ()
553+ time .sleep (ERROR_RECOVERY_DELAY )
525554 except Exception :
526555 _logger .exception (
527556 "exception: sleeping %ds and retrying" , ERROR_RECOVERY_DELAY
0 commit comments