diff --git a/queue_job/README.rst b/queue_job/README.rst index 529eba8745..4722105141 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -132,6 +132,23 @@ Configuration .. [1] It works with the threaded Odoo server too, although this way of running Odoo is obviously not for production purposes. +* Deploying in high availability mode or odoo.sh: + +When deploying queue_job on multiple nodes or on odoo.sh, on top of the configuration +parameters mentioned above you need to also set the env variable +ODOO_QUEUE_JOB_HIGH_AVAILABILITY=1 or via config parameter as such: + +.. code-block:: ini + + (...) + [queue_job] + high_availability = 1 + + +> :warning: **Warning:** Failure to enable the high_availability flag on odoo.sh could +constitute a breach of Acceptable Use Policy. Always enable this flag via the odoo.conf file for odoo.sh +deployment + Usage ===== @@ -571,6 +588,12 @@ Known issues / Roadmap You must therefore requeue them manually, either from the Jobs view, or by running the following SQL statement *before starting Odoo*: +* When deployed in high_availability mode the allocated databases for the + jobrunners must be identical. If the databases are different and overlap + i.e jobrunner A runs on DB1,DB2 and jobrunner B runs on DB2,DB3 then either + DB1 or DB3 will not proccess jobs because there can be only one leader per + sets of databases. + .. code-block:: sql update queue_job set state='pending' where state in ('started', 'enqueued') @@ -631,6 +654,8 @@ Contributors * Souheil Bejaoui * Eric Antones * Simone Orsi +* Paul Catinean +* Ruchir Shukla Maintainers ~~~~~~~~~~~ diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 1e0b07d445..05eede940e 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -145,6 +145,7 @@ import selectors import threading import time +import uuid from contextlib import closing, contextmanager import psycopg2 @@ -159,6 +160,7 @@ SELECT_TIMEOUT = 60 ERROR_RECOVERY_DELAY = 5 +LEADER_CHECK_DELAY = 10 _logger = logging.getLogger(__name__) @@ -192,7 +194,7 @@ def _odoo_now(): return _datetime_to_epoch(dt) -def _connection_info_for(db_name): +def _connection_info_for(db_name, jobrunner_ha_uuid=None): db_or_uri, connection_info = odoo.sql_db.connection_info_for(db_name) for p in ("host", "port", "user", "password"): @@ -202,6 +204,8 @@ def _connection_info_for(db_name): if cfg: connection_info[p] = cfg + if jobrunner_ha_uuid: + connection_info["application_name"] = "jobrunner_%s" % jobrunner_ha_uuid return connection_info @@ -260,14 +264,13 @@ def urlopen(): class Database(object): - def __init__(self, db_name): + def __init__(self, db_name, jobrunner_ha_uuid=None): self.db_name = db_name - connection_info = _connection_info_for(db_name) + self.jobrunner_ha_uuid = jobrunner_ha_uuid + connection_info = _connection_info_for(db_name, self.jobrunner_ha_uuid) self.conn = psycopg2.connect(**connection_info) self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) self.has_queue_job = self._has_queue_job() - if self.has_queue_job: - self._initialize() def close(self): # pylint: disable=except-pass @@ -280,6 +283,41 @@ def close(self): pass self.conn = None + def _check_leader(self, jobrunner_db_names): + """Check if the linked jobrunner is the leader of all jobrunner_db_names""" + if not self.jobrunner_ha_uuid: + return False + + with closing(self.conn.cursor()) as cr: + cr.execute( + """ + SELECT substring(application_name FROM 'jobrunner_(.*)') + FROM pg_stat_activity + WHERE application_name LIKE 'jobrunner_%%' AND + datname IN %s + ORDER BY backend_start, datname + LIMIT 1;""", + (jobrunner_db_names,), + ) + res = cr.fetchone() + leader_uuid = res[0] if res else "" + if leader_uuid != self.jobrunner_ha_uuid: + _logger.debug( + "jobrunner %s: not leader of db(s) [ %s ]. leader: %s. sleeping %s sec.", + self.jobrunner_ha_uuid, + ", ".join(jobrunner_db_names), + leader_uuid, + LEADER_CHECK_DELAY, + ) + return False + + _logger.info( + "jobrunner %s is now the leader of db(s) [ %s ]", + self.jobrunner_ha_uuid, + ", ".join(jobrunner_db_names), + ) + return True + def _has_queue_job(self): with closing(self.conn.cursor()) as cr: cr.execute( @@ -353,6 +391,7 @@ def __init__( user=None, password=None, channel_config_string=None, + high_availability=None, ): self.scheme = scheme self.host = host @@ -363,6 +402,10 @@ def __init__( if channel_config_string is None: channel_config_string = _channels() self.channel_manager.simple_configure(channel_config_string) + self.uuid = False + if high_availability: + self.uuid = str(uuid.uuid4()) + _logger.info("jobrunner %s initialized in HA mode", self.uuid) self.db_by_name = {} self._stop = False self._stop_pipe = os.pipe() @@ -388,12 +431,18 @@ def from_environ_or_config(cls): password = os.environ.get( "ODOO_QUEUE_JOB_HTTP_AUTH_PASSWORD" ) or queue_job_config.get("http_auth_password") + if "ODOO_QUEUE_JOB_HIGH_AVAILABILITY" in os.environ: + high_availability = str(os.environ["ODOO_QUEUE_JOB_HIGH_AVAILABILITY"]) + else: + high_availability = str(queue_job_config.get("high_availability")) + high_availability = high_availability.lower() in ("true", "1", "t") runner = cls( scheme=scheme or "http", host=host or "localhost", port=port or 8069, user=user, password=password, + high_availability=high_availability, ) return runner @@ -421,15 +470,20 @@ def close_databases(self, remove_jobs=True): _logger.warning("error closing database %s", db_name, exc_info=True) self.db_by_name = {} + def initialize_runner(self): + """Listen for db notifications and load existing jobs into memory""" + for db in self.db_by_name.values(): + db._initialize() + with db.select_jobs("state in %s", (NOT_DONE,)) as cr: + for job_data in cr: + self.channel_manager.notify(db.db_name, *job_data) + _logger.info("queue job runner ready for db %s", db.db_name) + def initialize_databases(self): for db_name in self.get_db_names(): - db = Database(db_name) + db = Database(db_name, self.uuid) if db.has_queue_job: self.db_by_name[db_name] = db - with db.select_jobs("state in %s", (NOT_DONE,)) as cr: - for job_data in cr: - self.channel_manager.notify(db_name, *job_data) - _logger.info("queue job runner ready for db %s", db_name) def run_jobs(self): now = _odoo_now() @@ -511,6 +565,17 @@ def stop(self): # wakeup the select() in wait_notification os.write(self._stop_pipe[1], b".") + def check_db_leader(self): + """Check if the current jobrunner is the leader for all configured databases""" + jobrunner_db_names = tuple(self.db_by_name.keys()) + + if not jobrunner_db_names: + return False + + # Use the first db connection to for leadership check + db_obj = self.db_by_name[jobrunner_db_names[0]] + return db_obj._check_leader(jobrunner_db_names) + def run(self): _logger.info("starting") while not self._stop: @@ -520,6 +585,13 @@ def run(self): # TODO: how to detect new databases or databases # on which queue_job is installed after server start? self.initialize_databases() + while not self._stop and self.uuid: + leader = self.check_db_leader() + if leader: + break + time.sleep(LEADER_CHECK_DELAY) + continue + self.initialize_runner() _logger.info("database connections ready") # inner loop does the normal processing while not self._stop: diff --git a/queue_job/readme/CONFIGURE.rst b/queue_job/readme/CONFIGURE.rst index d5782be57d..3baa577ec1 100644 --- a/queue_job/readme/CONFIGURE.rst +++ b/queue_job/readme/CONFIGURE.rst @@ -41,3 +41,20 @@ .. [1] It works with the threaded Odoo server too, although this way of running Odoo is obviously not for production purposes. + +* Deploying in high availability mode or odoo.sh: + +When deploying queue_job on multiple nodes or on odoo.sh, on top of the configuration +parameters mentioned above you need to also set the env variable +ODOO_QUEUE_JOB_HIGH_AVAILABILITY=1 or via config parameter as such: + +.. code-block:: ini + + (...) + [queue_job] + high_availability = 1 + + +> :warning: **Warning:** Failure to enable the high_availability flag on odoo.sh could +constitute a breach of Acceptable Use Policy. Always enable this flag via the odoo.conf file for odoo.sh +deployment diff --git a/queue_job/readme/CONTRIBUTORS.rst b/queue_job/readme/CONTRIBUTORS.rst index 4b34823abe..b0e3ef267a 100644 --- a/queue_job/readme/CONTRIBUTORS.rst +++ b/queue_job/readme/CONTRIBUTORS.rst @@ -10,3 +10,5 @@ * Souheil Bejaoui * Eric Antones * Simone Orsi +* Paul Catinean +* Ruchir Shukla diff --git a/queue_job/readme/ROADMAP.rst b/queue_job/readme/ROADMAP.rst index 34cc20e6db..ced0ab63a9 100644 --- a/queue_job/readme/ROADMAP.rst +++ b/queue_job/readme/ROADMAP.rst @@ -13,6 +13,12 @@ You must therefore requeue them manually, either from the Jobs view, or by running the following SQL statement *before starting Odoo*: +* When deployed in high_availability mode the allocated databases for the + jobrunners must be identical. If the databases are different and overlap + i.e jobrunner A runs on DB1,DB2 and jobrunner B runs on DB2,DB3 then either + DB1 or DB3 will not proccess jobs because there can be only one leader per + sets of databases. + .. code-block:: sql update queue_job set state='pending' where state in ('started', 'enqueued') diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html index 324ad909a9..d8f48244c6 100644 --- a/queue_job/static/description/index.html +++ b/queue_job/static/description/index.html @@ -492,6 +492,20 @@

Configuration

of running Odoo is obviously not for production purposes. +
    +
  • Deploying in high availability mode or odoo.sh:
  • +
+

When deploying queue_job on multiple nodes or on odoo.sh, on top of the configuration +parameters mentioned above you need to also set the env variable +ODOO_QUEUE_JOB_HIGH_AVAILABILITY=1 or via config parameter as such:

+
+(...)
+[queue_job]
+high_availability = 1
+
+

> :warning: Warning: Failure to enable the high_availability flag on odoo.sh could +constitute a breach of Acceptable Use Policy. Always enable this flag via the odoo.conf file for odoo.sh +deployment

Usage

@@ -871,6 +885,11 @@

Known issues / Roadmap

therefore fill the running queue and prevent other jobs to start. You must therefore requeue them manually, either from the Jobs view, or by running the following SQL statement before starting Odoo: +
  • When deployed in high_availability mode the allocated databases for the +jobrunners must be identical. If the databases are different and overlap +i.e jobrunner A runs on DB1,DB2 and jobrunner B runs on DB2,DB3 then either +DB1 or DB3 will not proccess jobs because there can be only one leader per +sets of databases.
  •  update queue_job set state='pending' where state in ('started', 'enqueued')
    @@ -930,6 +949,8 @@ 

    Contributors

  • Souheil Bejaoui <souheil.bejaoui@acsone.eu>
  • Eric Antones <eantones@nuobit.com>
  • Simone Orsi <simone.orsi@camptocamp.com>
  • +
  • Paul Catinean <pca@pledra.com>
  • +
  • Ruchir Shukla <ruchir@bizzappdev.com>