diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index 44c9d16e9..62c6a5407 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -8,6 +8,7 @@ from tornado.locks import Event from IPython.core.getipython import get_ipython +from IPython.core.inputtransformer2 import leading_empty_lines try: from jupyter_client.jsonutil import json_default @@ -286,6 +287,7 @@ def __init__(self, log, debugpy_stream, event_callback, shell_socket, session): self.stopped_threads = [] self.debugpy_initialized = False + self._removed_cleanup = {} self.debugpy_host = '127.0.0.1' self.debugpy_port = 0 @@ -341,12 +343,25 @@ def start(self): ident, msg = self.session.recv(self.shell_socket, mode=0) self.debugpy_initialized = msg['content']['status'] == 'ok' + + # Don't remove leading empty lines when debugging so the breakpoints are correctly positioned + cleanup_transforms = get_ipython().input_transformer_manager.cleanup_transforms + if leading_empty_lines in cleanup_transforms: + index = cleanup_transforms.index(leading_empty_lines) + self._removed_cleanup[index] = cleanup_transforms.pop(index) + self.debugpy_client.connect_tcp_socket() return self.debugpy_initialized def stop(self): self.debugpy_client.disconnect_tcp_socket() + # Restore remove cleanup transformers + cleanup_transforms = get_ipython().input_transformer_manager.cleanup_transforms + for index in sorted(self._removed_cleanup): + func = self._removed_cleanup.pop(index) + cleanup_transforms.insert(index, func) + async def dumpCell(self, message): code = message['arguments']['code'] file_name = get_file_name(code) diff --git a/ipykernel/tests/test_debugger.py b/ipykernel/tests/test_debugger.py index d7263ca9b..ab9f13a2c 100644 --- a/ipykernel/tests/test_debugger.py +++ b/ipykernel/tests/test_debugger.py @@ -1,6 +1,7 @@ +import sys import pytest -from .utils import new_kernel, get_reply +from .utils import TIMEOUT, new_kernel, get_reply seq = 0 @@ -8,7 +9,7 @@ pytest.importorskip("debugpy") -def wait_for_debug_request(kernel, command, arguments=None): +def wait_for_debug_request(kernel, command, arguments=None, full_reply=False): """Carry out a debug request and return the reply content. It does not check if the request was successful. @@ -27,7 +28,7 @@ def wait_for_debug_request(kernel, command, arguments=None): ) kernel.control_channel.send(msg) reply = get_reply(kernel, msg["header"]["msg_id"], channel="control") - return reply["content"] + return reply if full_reply else reply["content"] @pytest.fixture @@ -127,6 +128,76 @@ def test_set_breakpoints(kernel_with_debug): assert r["success"] +def test_stop_on_breakpoint(kernel_with_debug): + code = """def f(a, b): + c = a + b + return c + +f(2, 3)""" + + r = wait_for_debug_request(kernel_with_debug, "dumpCell", {"code": code}) + source = r["body"]["sourcePath"] + + wait_for_debug_request(kernel_with_debug, "debugInfo") + + wait_for_debug_request( + kernel_with_debug, + "setBreakpoints", + { + "breakpoints": [{"line": 2}], + "source": {"path": source}, + "sourceModified": False, + }, + ) + + wait_for_debug_request(kernel_with_debug, "configurationDone", full_reply=True) + + kernel_with_debug.execute(code) + + # Wait for stop on breakpoint + msg = {"msg_type": "", "content": {}} + while msg.get('msg_type') != 'debug_event' or msg["content"].get("event") != "stopped": + msg = kernel_with_debug.get_iopub_msg(timeout=TIMEOUT) + + assert msg["content"]["body"]["reason"] == "breakpoint" + + +@pytest.mark.skipif(sys.version_info >= (3, 10), reason="TODO Does not work on Python 3.10") +def test_breakpoint_in_cell_with_leading_empty_lines(kernel_with_debug): + code = """ +def f(a, b): + c = a + b + return c + +f(2, 3)""" + + r = wait_for_debug_request(kernel_with_debug, "dumpCell", {"code": code}) + source = r["body"]["sourcePath"] + + wait_for_debug_request(kernel_with_debug, "debugInfo") + + wait_for_debug_request( + kernel_with_debug, + "setBreakpoints", + { + "breakpoints": [{"line": 6}], + "source": {"path": source}, + "sourceModified": False, + }, + ) + + wait_for_debug_request(kernel_with_debug, "configurationDone", full_reply=True) + + kernel_with_debug.execute(code) + + # Wait for stop on breakpoint + msg = {"msg_type": "", "content": {}} + while msg.get('msg_type') != 'debug_event' or msg["content"].get("event") != "stopped": + msg = kernel_with_debug.get_iopub_msg(timeout=TIMEOUT) + + assert msg["content"]["body"]["reason"] == "breakpoint" + + def test_rich_inspect_not_at_breakpoint(kernel_with_debug): var_name = "text" value = "Hello the world"