Skip to content

Commit 357c908

Browse files
authored
Correct use of asyncio.Lock to process a single control message at a time (#1416)
1 parent f96df51 commit 357c908

File tree

3 files changed

+139
-7
lines changed

3 files changed

+139
-7
lines changed

ipykernel/kernelbase.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,9 @@ def _parent_header(self):
246246
# execution count we store in the shell.
247247
execution_count = 0
248248

249+
# Asyncio lock to ensure only one control queue message is processed at a time.
250+
_control_lock = Instance(asyncio.Lock)
251+
249252
msg_types = [
250253
"execute_request",
251254
"complete_request",
@@ -295,7 +298,7 @@ def __init__(self, **kwargs):
295298

296299
async def dispatch_control(self, msg):
297300
# Ensure only one control message is processed at a time
298-
async with asyncio.Lock():
301+
async with self._control_lock:
299302
await self.process_control(msg)
300303

301304
async def process_control(self, msg):
@@ -540,6 +543,10 @@ def schedule_dispatch(self, dispatch, *args):
540543
# ensure the eventloop wakes up
541544
self.io_loop.add_callback(lambda: None)
542545

546+
async def _create_control_lock(self):
547+
# This can be removed when minimum python increases to 3.10
548+
self._control_lock = asyncio.Lock()
549+
543550
def start(self):
544551
"""register dispatchers for streams"""
545552
self.io_loop = ioloop.IOLoop.current()
@@ -549,6 +556,14 @@ def start(self):
549556
if self.control_stream:
550557
self.control_stream.on_recv(self.dispatch_control, copy=False)
551558

559+
if self.control_thread and sys.version_info < (3, 10):
560+
# Before Python 3.10 we need to ensure the _control_lock is created in the
561+
# thread that uses it. When our minimum python is 3.10 we can remove this
562+
# and always use the else below, or just assign it where it is declared.
563+
self.control_thread.io_loop.add_callback(self._create_control_lock)
564+
else:
565+
self._control_lock = asyncio.Lock()
566+
552567
if self.shell_stream:
553568
self.shell_stream.on_recv(
554569
partial(

tests/test_debugger.py

Lines changed: 105 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import pytest
44

5-
from .utils import TIMEOUT, get_reply, new_kernel
5+
from .utils import TIMEOUT, get_replies, get_reply, new_kernel
66

77
seq = 0
88

@@ -15,11 +15,8 @@
1515
debugpy = None
1616

1717

18-
def wait_for_debug_request(kernel, command, arguments=None, full_reply=False):
19-
"""Carry out a debug request and return the reply content.
20-
21-
It does not check if the request was successful.
22-
"""
18+
def prepare_debug_request(kernel, command, arguments=None):
19+
"""Prepare a debug request but do not send it."""
2320
global seq
2421
seq += 1
2522

@@ -32,6 +29,15 @@ def wait_for_debug_request(kernel, command, arguments=None, full_reply=False):
3229
"arguments": arguments or {},
3330
},
3431
)
32+
return msg
33+
34+
35+
def wait_for_debug_request(kernel, command, arguments=None, full_reply=False):
36+
"""Carry out a debug request and return the reply content.
37+
38+
It does not check if the request was successful.
39+
"""
40+
msg = prepare_debug_request(kernel, command, arguments)
3541
kernel.control_channel.send(msg)
3642
reply = get_reply(kernel, msg["header"]["msg_id"], channel="control")
3743
return reply if full_reply else reply["content"]
@@ -448,3 +454,96 @@ def my_test():
448454

449455
# Compare local and global variable
450456
assert global_var["value"] == local_var["value"] and global_var["type"] == local_var["type"] # noqa: PT018
457+
458+
459+
def test_debug_requests_sequential(kernel_with_debug):
460+
# Issue https://github.com/ipython/ipykernel/issues/1412
461+
# Control channel requests should be executed sequentially not concurrently.
462+
code = """def f(a, b):
463+
c = a + b
464+
return c
465+
466+
f(2, 3)"""
467+
468+
r = wait_for_debug_request(kernel_with_debug, "dumpCell", {"code": code})
469+
if debugpy:
470+
source = r["body"]["sourcePath"]
471+
else:
472+
assert r == {}
473+
source = "some path"
474+
475+
wait_for_debug_request(
476+
kernel_with_debug,
477+
"setBreakpoints",
478+
{
479+
"breakpoints": [{"line": 2}],
480+
"source": {"path": source},
481+
"sourceModified": False,
482+
},
483+
)
484+
485+
wait_for_debug_request(kernel_with_debug, "debugInfo")
486+
wait_for_debug_request(kernel_with_debug, "configurationDone")
487+
kernel_with_debug.execute(code)
488+
489+
if not debugpy:
490+
# Cannot stop on breakpoint if debugpy not installed
491+
return
492+
493+
# Wait for stop on breakpoint
494+
msg: dict = {"msg_type": "", "content": {}}
495+
while msg.get("msg_type") != "debug_event" or msg["content"].get("event") != "stopped":
496+
msg = kernel_with_debug.get_iopub_msg(timeout=TIMEOUT)
497+
498+
stacks = wait_for_debug_request(kernel_with_debug, "stackTrace", {"threadId": 1})["body"][
499+
"stackFrames"
500+
]
501+
502+
scopes = wait_for_debug_request(kernel_with_debug, "scopes", {"frameId": stacks[0]["id"]})[
503+
"body"
504+
]["scopes"]
505+
506+
# Get variablesReference for both Locals and Globals.
507+
locals_ref = next(filter(lambda s: s["name"] == "Locals", scopes))["variablesReference"]
508+
globals_ref = next(filter(lambda s: s["name"] == "Globals", scopes))["variablesReference"]
509+
510+
msgs = []
511+
for ref in [locals_ref, globals_ref]:
512+
msgs.append(
513+
prepare_debug_request(kernel_with_debug, "variables", {"variablesReference": ref})
514+
)
515+
516+
# Send messages in quick succession.
517+
for msg in msgs:
518+
kernel_with_debug.control_channel.send(msg)
519+
520+
replies = get_replies(kernel_with_debug, [msg["msg_id"] for msg in msgs], channel="control")
521+
522+
# Check debug variable returns are correct.
523+
locals = replies[0]["content"]
524+
assert locals["success"]
525+
variables = locals["body"]["variables"]
526+
var = next(filter(lambda v: v["name"] == "a", variables))
527+
assert var["type"] == "int"
528+
assert var["value"] == "2"
529+
var = next(filter(lambda v: v["name"] == "b", variables))
530+
assert var["type"] == "int"
531+
assert var["value"] == "3"
532+
533+
globals = replies[1]["content"]
534+
assert globals["success"]
535+
variables = globals["body"]["variables"]
536+
537+
names = [v["name"] for v in variables]
538+
assert "function variables" in names
539+
assert "special variables" in names
540+
541+
# Check status iopub messages alternate between busy and idle.
542+
execution_states = []
543+
while len(execution_states) < 8:
544+
msg = kernel_with_debug.get_iopub_msg(timeout=TIMEOUT)
545+
if msg["msg_type"] == "status":
546+
execution_states.append(msg["content"]["execution_state"])
547+
assert execution_states.count("busy") == 4
548+
assert execution_states.count("idle") == 4
549+
assert execution_states == ["busy", "idle"] * 4

tests/utils.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,24 @@ def get_reply(kc, msg_id, timeout=TIMEOUT, channel="shell"):
6262
return reply
6363

6464

65+
def get_replies(kc, msg_ids: list[str], timeout=TIMEOUT, channel="shell"):
66+
# Get replies which may arrive in any order as they may be running on different subshells.
67+
# Replies are returned in the same order as the msg_ids, not in the order of arrival.
68+
count = 0
69+
replies = [None] * len(msg_ids)
70+
while count < len(msg_ids):
71+
get_msg = getattr(kc, f"get_{channel}_msg")
72+
reply = get_msg(timeout=timeout)
73+
try:
74+
msg_id = reply["parent_header"]["msg_id"]
75+
replies[msg_ids.index(msg_id)] = reply
76+
count += 1
77+
except ValueError:
78+
# Allow debugging ignored replies
79+
print(f"Ignoring reply not to any of {msg_ids}: {reply}")
80+
return replies
81+
82+
6583
def execute(code="", kc=None, **kwargs):
6684
"""wrapper for doing common steps for validating an execution request"""
6785
from .test_message_spec import validate_message

0 commit comments

Comments
 (0)