Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions base_import_async/models/base_import_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def do(self, fields, columns, options, dryrun=False):
attachment = self._create_csv_attachment(
import_fields, data, options, self.file_name
)
delayed_job = self.with_delay(description=description)._split_file(
delayed_job = self.with_delay(
description=description, keep_context=True
)._split_file(
model_name=self.res_model,
translated_model_name=translated_model_name,
attachment=attachment,
Expand Down Expand Up @@ -162,7 +164,7 @@ def _split_file(
file_name=root + "-" + chunk + ext,
)
delayed_job = self.with_delay(
description=description, priority=priority
description=description, priority=priority, keep_context=True
)._import_one_chunk(
model_name=model_name, attachment=attachment, options=options
)
Expand Down
34 changes: 31 additions & 3 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from datetime import datetime, timedelta

import odoo
from odoo.tools.safe_eval import safe_eval

from .exception import FailedJobError, NoSuchJobError, RetryableJobError

Expand Down Expand Up @@ -59,6 +60,7 @@ def __init__(
description=None,
channel=None,
identity_key=None,
keep_context=False,
):
self.recordset = recordset
self.priority = priority
Expand All @@ -67,6 +69,7 @@ def __init__(
self.description = description
self.channel = channel
self.identity_key = identity_key
self.keep_context = keep_context

def __getattr__(self, name):
if name in self.recordset:
Expand All @@ -88,6 +91,7 @@ def delay(*args, **kwargs):
description=self.description,
channel=self.channel,
identity_key=self.identity_key,
keep_context=self.keep_context,
)

return delay
Expand Down Expand Up @@ -339,6 +343,7 @@ def enqueue(
description=None,
channel=None,
identity_key=None,
keep_context=False,
):
"""Create a Job and enqueue it in the queue. Return the job uuid.

Expand All @@ -359,6 +364,7 @@ def enqueue(
description=description,
channel=channel,
identity_key=identity_key,
keep_context=keep_context,
)
if new_job.identity_key:
existing = new_job.job_record_with_same_identity_key()
Expand Down Expand Up @@ -399,6 +405,7 @@ def __init__(
description=None,
channel=None,
identity_key=None,
keep_context=False,
):
""" Create a Job

Expand All @@ -423,8 +430,8 @@ def __init__(
:param identity_key: A hash to uniquely identify a job, or a function
that returns this hash (the function takes the job
as argument)
:param env: Odoo Environment
:type env: :class:`odoo.api.Environment`
:param keep_context: Determine if the current context should be restored
:type keep_context: :bool
"""
if args is None:
args = ()
Expand All @@ -445,6 +452,7 @@ def __init__(
self.recordset = recordset

self.env = env
self.keep_context = keep_context
self.job_model = self.env["queue.job"]
self.job_model_name = "queue.job"

Expand Down Expand Up @@ -594,8 +602,11 @@ def _store_values(self, create=False):
"records": self.recordset,
"args": self.args,
"kwargs": self.kwargs,
"context": "{}",
}
)
if self.keep_context:
vals.update({"context": str(self.env.context.copy())})

vals_from_model = self._store_values_from_model()
# Sanitize values: make sure you cannot screw core values
Expand All @@ -615,6 +626,22 @@ def _store_values_from_model(self):
vals = handler(self)
return vals

def _get_record_context(self):
"""
Get the context to execute the job
"""
# return {}
company_ids = []
if self.company_id:
company_ids = [self.company_id]
context_txt = self.db_record().context or {}
if isinstance(context_txt, str):
context = safe_eval(context_txt)
else:
context = context_txt
context.update({"job_uuid": self.uuid, "allowed_company_ids": company_ids})
return context

@property
def func_string(self):
model = repr(self.recordset)
Expand All @@ -628,7 +655,8 @@ def db_record(self):

@property
def func(self):
recordset = self.recordset.with_context(job_uuid=self.uuid)
context = self._get_record_context()
recordset = self.recordset.with_context(**context)
return getattr(recordset, self.method_name)

@property
Expand Down
4 changes: 4 additions & 0 deletions queue_job/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def with_delay(
description=None,
channel=None,
identity_key=None,
keep_context=False,
):
""" Return a ``DelayableRecordset``

Expand Down Expand Up @@ -81,6 +82,8 @@ def with_delay(
the new job will not be added. It is either a
string, either a function that takes the job as
argument (see :py:func:`..job.identity_exact`).
:param keep_context: boolean to set if the current context
should be restored on the recordset (default: False).
:return: instance of a DelayableRecordset
:rtype: :class:`odoo.addons.queue_job.job.DelayableRecordset`

Expand Down Expand Up @@ -108,6 +111,7 @@ def with_delay(
description=description,
channel=channel,
identity_key=identity_key,
keep_context=keep_context,
)

def _patch_job_auto_delay(self, method_name, context_key=None):
Expand Down
6 changes: 6 additions & 0 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ class QueueJob(models.Model):
comodel_name="res.company", string="Company", index=True
)
name = fields.Char(string="Description", readonly=True)
context = fields.Char(
string="Context Value",
default="{}",
help="Context dictionary as Python expression, empty by default (Default: {})",
readonly=True,
)

model_name = fields.Char(string="Model", readonly=True)
method_name = fields.Char(readonly=True)
Expand Down
3 changes: 3 additions & 0 deletions queue_job/views/queue_job_views.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
<field name="exec_time" string="Time (s)" />
</group>
</group>
<group name="context_grp">
<field name="context" />
</group>
<group colspan="4">
<div>
<label for="retry" string="Current try / max. retries" />
Expand Down
93 changes: 93 additions & 0 deletions test_queue_job/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,49 @@ def test_store_extra_data(self):
stored.invalidate_cache()
self.assertEqual(stored.additional_info, "JUST_TESTING_BUT_FAILED")

def test_company_simple(self):
company = self.env.ref("base.main_company")
eta = datetime.now() + timedelta(hours=5)
test_job = Job(
self.method,
args=("o", "k"),
kwargs={"return_context": 1},
priority=15,
eta=eta,
description="My description",
)
test_job.worker_pid = 99999 # normally set on "set_start"
test_job.company_id = company.id
test_job.store()
job_read = Job.load(self.env, test_job.uuid)
self.assertEqual(test_job.func, job_read.func)
result_ctx = test_job.func(*tuple(test_job.args), **test_job.kwargs)
self.assertEqual(result_ctx.get("allowed_company_ids"), company.ids)

def test_company_complex(self):
company1 = self.env.ref("base.main_company")
company2 = company1.create({"name": "Queue job company"})
companies = company1 | company2
self.env.user.write({"company_ids": [(6, False, companies.ids)]})
# Ensure the main company still the first
self.assertEqual(self.env.user.company_id, company1)
eta = datetime.now() + timedelta(hours=5)
test_job = Job(
self.method,
args=("o", "k"),
kwargs={"return_context": 1},
priority=15,
eta=eta,
description="My description",
)
test_job.worker_pid = 99999 # normally set on "set_start"
test_job.company_id = company2.id
test_job.store()
job_read = Job.load(self.env, test_job.uuid)
self.assertEqual(test_job.func, job_read.func)
result_ctx = test_job.func(*tuple(test_job.args), **test_job.kwargs)
self.assertEqual(result_ctx.get("allowed_company_ids"), company2.ids)

def test_read(self):
eta = datetime.now() + timedelta(hours=5)
test_job = Job(
Expand Down Expand Up @@ -536,6 +579,56 @@ def test_context_uuid(self):
self.assertTrue(key_present)
self.assertEqual(result["job_uuid"], test_job._uuid)

def test_context_custom_keep_context_default(self):
"""
Use with_delay without specify 'keep_context' key.
So ensure the default False value to this params.
So the context shouldn't be restored with the recordset.
"""
delayable = (
self.env["test.queue.job"].with_context(world_origin=42).with_delay()
)
test_job = delayable.testing_method()
self.assertEqual(test_job.db_record().context, "{}")
expected_ctx = {
"job_uuid": test_job.uuid,
"allowed_company_ids": [test_job.company_id],
}
self.assertEqual(test_job._get_record_context(), expected_ctx)

def test_context_custom_keep_context_false(self):
"""
Use with_delay without specify by specifying keep_context to False.
So the context shouldn't be restored with the recordset.
"""
delayable = (
self.env["test.queue.job"]
.with_context(world_origin=42)
.with_delay(keep_context=False)
)
test_job = delayable.testing_method()
self.assertEqual(test_job.db_record().context, "{}")
expected_ctx = {
"job_uuid": test_job.uuid,
"allowed_company_ids": [test_job.company_id],
}
self.assertEqual(test_job._get_record_context(), expected_ctx)

def test_context_custom_keep_context_true(self):
"""
Use with_delay without specify by specifying keep_context to True.
So the context should be restored with the recordset.
"""
recordset = self.env["test.queue.job"].with_context(world_origin=42)
delayable = recordset.with_delay(keep_context=True)
test_job = delayable.testing_method()
self.assertEqual(test_job.db_record().context, str(recordset.env.context))
expected_ctx = recordset.env.context.copy()
expected_ctx.update(
{"job_uuid": test_job.uuid, "allowed_company_ids": [test_job.company_id]}
)
self.assertEqual(test_job._get_record_context(), expected_ctx)

def test_override_channel(self):
delayable = self.env["test.queue.job"].with_delay(channel="root.sub.sub")
test_job = delayable.testing_method(return_context=True)
Expand Down