From 9bce1fa1a7b302d145c86d5fb0e34e3d7c16f6bf Mon Sep 17 00:00:00 2001 From: Johan Mabille Date: Tue, 9 Feb 2021 22:30:34 +0100 Subject: [PATCH 01/14] Debugger implementation --- ipykernel/debugger.py | 287 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 287 insertions(+) create mode 100644 ipykernel/debugger.py diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py new file mode 100644 index 000000000..32a785390 --- /dev/null +++ b/ipykernel/debugger.py @@ -0,0 +1,287 @@ +import logging +import os + +from zmq.utils import jsonapi +from traitlets import Instance + +from asyncio import Queue + +class DebugpyMessageQueue: + + HEADER = 'Content Length: ' + HEADER_LENGTH = 16 + SEPARATOR = '\r\n\r\n' + SEPARATOR_LENGTH = 4 + + def __init__(self, event_callback): + self.tcp_buffer = '' + self._reset_tcp_pos() + self.event_callback = event_callback + self.message_queue = Queue + + def _reset_tcp_pos(self): + self.header_pos = -1 + self.separator_pos = -1 + self.message_size = 0 + self.message_pos = -1 + + def _put_message(self, raw_msg): + # TODO: forward to iopub if this is an event message + msg = jsonapi.loads(raw_msg) + if mes['type'] == 'event': + self.event_callback(msg) + else: + self.message_queue.put_nowait(msg) + + def put_tcp_frame(self, frame): + self.tcp_buffer += frame + # TODO: not sure this is required + #self.tcp_buffer += frame.decode("utf-8") + + # Finds header + if self.header_pos == -1: + self.header_pos = self.tcp_buffer.find(DebugpyMessageQueue.HEADER, hint) + if self.header_pos == -1: + return + + #Finds separator + if self.separator_pos == -1: + hint = self.header_pos + DebugpyMessageQueue.HEADER_lenth + self.separator_pos = self.tcp_buffer.find(DebugpyMessageQueue.SEPARATOR, hint) + if self.separator_pos == -1: + return + + if self.message_pos == -1: + size_pos = self.header_pos + DebugpyMessageQueue.HEADER_lenth + self.message_pos = self.separator_pos + DebugpyMessageQueue.SEPARATOR_LENGTH + self.message_size = int(self.tcp_buf[size_pos:self.separator_pos]) + + if len(self.tcp_buffer - self.message_pos) < self.message_size: + return + + self._put_message(self.tcp_buf[self.message_pos:self.message_size]) + if len(self.tcp_buffer - self_message_pos) == self.message_size: + self.reset_buffer() + else: + self.tcp_buffer = self.tcp_buffer[self.message_pos + self.message_size:] + self._reset_tcp_pos() + + async def get_message(self): + return await self.message_queue.get() + + +class DebugpyClient: + + def __init__(self, debugpy_socket, debugpy_stream): + self.debugpy_socket = debugpy_socket + self.debugpy_stream = debugpy_stream + self.message_queue = DebugpyMessageQueue(self._forward_event) + self.wait_for_attach = True + self.init_event = asyncio.Event() + + def _forward_event(self, msg): + if msg['event'] == 'initialized': + self.init_event.set() + #TODO: send event to iopub + + def _send_request(self, msg): + content = jsonapi.dumps(msg) + content_length = len(content) + buf = DebugpyMessageQueue.HEADER + content_length + DebugpyMessageQueue.SEPARATOR + content_msg + self.debugpy_socket.send(buf) # TODO: pass routing_id + + async def _wait_for_reponse(self): + # Since events are never pushed to the message_queue + # we can safely assume the next message in queue + # will be an answer to the previous request + return await self.message_queue.get() + + async def _handle_init_sequence(self): + # 1] Waits for initialized event + await self.init_event.wait() + + # 2] Sends configurationDone request + configurationDone = { + 'type': 'request', + 'seq': int(self.init_event_message['seq']) + 1, + 'command': 'configurationDone' + } + self._send_request(configurationDone) + + # 3] Waits for configurationDone response + await self._wait_for_response() + + # 4] Waits for attachResponse and returns it + attach_rep = await self._wait_for_response() + return attach_rep + + async def send_dap_request(self, msg): + self._send_request(msg) + if self.wait_for_attach and msg['command'] == 'attach': + rep = await self._handle_init_sequence() + self.wait_for_attach = False + return rep + else: + rep = await self._wait_for_reponse() + return rep + +class Debugger: + + # Requests that requires that the debugger has started + started_debug_msg_types = [ + 'dumpCell', 'setBreakpoints', + 'source', 'stackTrace', + 'variables', 'attach', + 'configurationDone' + ] + + # Requests that can be handled even if the debugger is not running + static_debug_msg_types = [ + 'debugInfo', 'inspectVariables' + ] + + log = Instance(logging.Logger, allow_none=True) + + def __init__(self): + self.is_started = False + + self.header = '' + + self.started_debug_handlers = {} + for msg_type in started_debug_msg_types: + self.started_debug_handlers[msg_type] = getattr(self, msg_type) + + self.static_debug_handlers = {} + for msg_type in static_debug_msg_types: + self.static_debug_handlers[msg_type] = getattr(self, msg_type) + + self.breakpoint_list = {} + self.stopped_threads = [] + + async def _forward_message(self, msg): + return await self.debugpy_client.send_dap_request(msg) + + def start(self): + return False + + def stop(self): + pass + + def dumpCell(self, message): + return {} + + async def setBreakpoints(self, message): + source = message['arguments']['source']['path']; + self.breakpoint_list[source] = message['arguments']['breakpoints'] + return await self._forward_message(message); + + def source(self, message): + reply = { + 'type': 'response', + 'request_seq': message['seq'], + 'command': message['command'] + } + source_path = message['arguments']['source']['path']; + if os.path.isfile(source_path): + with open(source_path) as f: + reply['success'] = True + reply['body'] = { + 'content': f.read() + } + + else: + reply['success'] = False + reply['message'] = 'source unavailable' + reply['body'] = {} + + return reply + + async def stackTrace(self, message): + reply = await self._forward_message(message) + reply['body']['stackFrames'] = + [frame for frame in reply['body']['stackFrames'] if frame['source']['path'] != ''] + return reply + + async def variables(self, message): + reply = await self._forward_message(message) + # TODO : check start and count arguments work as expected in debugpy + return reply + + async def attach(self, message): + message['arguments']['connect'] = { + 'host': self.debugpy_host, + 'port': self.debugpy_port + } + message['arguments']['logToFile'] = True + return await self._forward_message(message) + + def configurationDone(self, message): + reply = { + 'seq': message['seq'], + 'type': 'response', + 'request_seq': message['seq'], + 'success': True, + 'command': message['command'] + } + return reply; + + def debugInfo(self, message): + reply = { + 'type': 'response', + 'request_seq': message['seq'], + 'success': True, + 'command': message['command'], + 'body': { + 'isStarted': self.is_started, + 'hashMethod': 'Murmur2', + 'hashSeed': 0, + 'tmpFilePrefix': 'coincoin', + 'tmpFileSuffix': '.py', + 'breakpoints': self.breakpoint_list, + 'stoppedThreads': self.stopped_threads + } + } + return reply + + def inspectVariables(self, message): + return {} + + async def process_request(self, header, message): + reply = {} + + if message['command'] == 'initialize': + if self.is_started: + self.log.info('The debugger has already started') + else: + self.is_started = self.start() + if self.is_started: + self.log.info('The debugger has started') + else: + reply = { + 'command', 'initialize', + 'request_seq', message['seq'], + 'seq', 3, + 'success', False, + 'type', 'response' + } + + handler = self.static_debug_handlers.get(message['command'], None) + if handler is not None: + reply = await handler(message) + elif self.is_started: + self.header = header + handler = self.started_debug_handlers.get(message['command'], None) + if handler is not None: + reply = await handler(message) + else + reply = await self._forward_message(message) + + if message['command'] == 'disconnect': + self.stop() + self.breakpoint_list = {} + self.stopped_threads = [] + self.is_started = False + self.log.info('The debugger has stopped') + + return reply + From a0be3997e2b7c5c856f492f117be652ef7c495cd Mon Sep 17 00:00:00 2001 From: Johan Mabille Date: Wed, 10 Feb 2021 18:08:25 +0100 Subject: [PATCH 02/14] Plugged debugger --- ipykernel/debugger.py | 73 ++++++++++++++++++++++++++++------------- ipykernel/kernelapp.py | 18 ++++++++++ ipykernel/kernelbase.py | 46 +++++++++++++++++++++++++- ipykernel/kernelspec.py | 1 + 4 files changed, 115 insertions(+), 23 deletions(-) diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index 32a785390..40c7a65f1 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -2,9 +2,10 @@ import os from zmq.utils import jsonapi -from traitlets import Instance +#from traitlets import Instance +#from traitlets.config.configurable import SingletonConfigurable -from asyncio import Queue +from asyncio import (Event, Queue) class DebugpyMessageQueue: @@ -26,7 +27,6 @@ def _reset_tcp_pos(self): self.message_pos = -1 def _put_message(self, raw_msg): - # TODO: forward to iopub if this is an event message msg = jsonapi.loads(raw_msg) if mes['type'] == 'event': self.event_callback(msg) @@ -72,23 +72,23 @@ async def get_message(self): class DebugpyClient: - def __init__(self, debugpy_socket, debugpy_stream): - self.debugpy_socket = debugpy_socket + def __init__(self, debugpy_stream, event_callback): self.debugpy_stream = debugpy_stream + self.event_callback = event_callback self.message_queue = DebugpyMessageQueue(self._forward_event) self.wait_for_attach = True - self.init_event = asyncio.Event() + self.init_event = Event() def _forward_event(self, msg): if msg['event'] == 'initialized': self.init_event.set() - #TODO: send event to iopub + self.event_callback(msg) def _send_request(self, msg): content = jsonapi.dumps(msg) content_length = len(content) buf = DebugpyMessageQueue.HEADER + content_length + DebugpyMessageQueue.SEPARATOR + content_msg - self.debugpy_socket.send(buf) # TODO: pass routing_id + self.debugpy_stream.send(buf) # TODO: pass routing_id async def _wait_for_reponse(self): # Since events are never pushed to the message_queue @@ -115,6 +115,9 @@ async def _handle_init_sequence(self): attach_rep = await self._wait_for_response() return attach_rep + def receive_dap_frame(self, frame): + self.message_queue.put_tcp_frame(frame) + async def send_dap_request(self, msg): self._send_request(msg) if self.wait_for_attach and msg['command'] == 'attach': @@ -140,19 +143,20 @@ class Debugger: 'debugInfo', 'inspectVariables' ] - log = Instance(logging.Logger, allow_none=True) + #log = Instance(logging.Logger, allow_none=True) - def __init__(self): + def __init__(self, debugpy_stream, event_callback, shell_socket, session): + self.debugpy_client = DebugpyClient(debugpy_stream, event_callback) + self.shell_socket = shell_socket + self.session = session self.is_started = False - - self.header = '' self.started_debug_handlers = {} - for msg_type in started_debug_msg_types: + for msg_type in Debugger.started_debug_msg_types: self.started_debug_handlers[msg_type] = getattr(self, msg_type) self.static_debug_handlers = {} - for msg_type in static_debug_msg_types: + for msg_type in Debugger.static_debug_msg_types: self.static_debug_handlers[msg_type] = getattr(self, msg_type) self.breakpoint_list = {} @@ -161,10 +165,27 @@ def __init__(self): async def _forward_message(self, msg): return await self.debugpy_client.send_dap_request(msg) + @property + def tcp_client(self): + return self.debugpy_client + def start(self): + endpoint = self.debugpy_client.debugpy_stream.socket.getsockopt(zmq.LAST_ENDPOINT) + index = endpoit.rfind(':') + port = endpoint[index+1:] + code = 'import debugpy;' + code += 'debugpy.listen(("127.0.0.1",' + port + '))' + content = { + 'code': code, + 'slient': True + } + self.session.send(self.shell_socket, 'execute_request', content, + None, (self.shell_socket.getsockopt(zmq.ROUTING_ID))) + return False def stop(self): + # TODO pass def dumpCell(self, message): @@ -198,7 +219,7 @@ def source(self, message): async def stackTrace(self, message): reply = await self._forward_message(message) - reply['body']['stackFrames'] = + reply['body']['stackFrames'] = \ [frame for frame in reply['body']['stackFrames'] if frame['source']['path'] != ''] return reply @@ -215,7 +236,7 @@ async def attach(self, message): message['arguments']['logToFile'] = True return await self._forward_message(message) - def configurationDone(self, message): + async def configurationDone(self, message): reply = { 'seq': message['seq'], 'type': 'response', @@ -225,7 +246,13 @@ def configurationDone(self, message): } return reply; - def debugInfo(self, message): + async def debugInfo(self, message): + breakpoint_list = [] + for key, value in self.breakpoint_list.items(): + breakpoint_list.append({ + 'source': key, + 'breakpoints': value + }) reply = { 'type': 'response', 'request_seq': message['seq'], @@ -235,18 +262,21 @@ def debugInfo(self, message): 'isStarted': self.is_started, 'hashMethod': 'Murmur2', 'hashSeed': 0, - 'tmpFilePrefix': 'coincoin', + 'tmpFilePrefix': '/tmp/ipykernel_debugger', 'tmpFileSuffix': '.py', - 'breakpoints': self.breakpoint_list, + 'breakpoints': breakpoint_list, 'stoppedThreads': self.stopped_threads } } + #self.log.info("returning reply %s", reply) + print("DEBUGGER: ", reply) return reply def inspectVariables(self, message): + # TODO return {} - async def process_request(self, header, message): + async def process_request(self, message): reply = {} if message['command'] == 'initialize': @@ -269,11 +299,10 @@ async def process_request(self, header, message): if handler is not None: reply = await handler(message) elif self.is_started: - self.header = header handler = self.started_debug_handlers.get(message['command'], None) if handler is not None: reply = await handler(message) - else + else: reply = await self._forward_message(message) if message['command'] == 'disconnect': diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index c7756fa2c..86dfb098c 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -122,6 +122,8 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp, context = Any() shell_socket = Any() control_socket = Any() + debugpy_socket = Any() + debug_shell_socket = Any() stdin_socket = Any() iopub_socket = Any() iopub_thread = Any() @@ -294,6 +296,16 @@ def init_control(self, context): self.control_port = self._bind_socket(self.control_socket, self.control_port) self.log.debug("control ROUTER Channel on port: %i" % self.control_port) + self.debugpy_socket = context.socket(zmq.STREAM) + self.debugpy_socket.linger = 1000 + self.debugpy_port = 0 + self.debugpy_port = self._bind_socket(self.debugpy_socket, self.debugpy_port) + self.log.debug("debugpy STREAM Channel on port: %i" % self.debugpy_port) + + self.debug_shell_socket = context.socket(zmq.DEALER) + self.debug_shell_socket.linger = 1000 + self.debug_shell_socket.connect(self.shell_socket.getsockopt(zmq.LAST_ENDPOINT)) + if hasattr(zmq, 'ROUTER_HANDOVER'): # set router-handover to workaround zeromq reconnect problems # in certain rare circumstances @@ -335,6 +347,9 @@ def close(self): self.log.debug("Closing iopub channel") self.iopub_thread.stop() self.iopub_thread.close() + + self.debug_shell_socket.close() + for channel in ('shell', 'control', 'stdin'): self.log.debug("Closing %s channel", channel) socket = getattr(self, channel + "_socket", None) @@ -449,11 +464,14 @@ def init_kernel(self): """Create the Kernel object itself""" shell_stream = ZMQStream(self.shell_socket) control_stream = ZMQStream(self.control_socket, self.control_thread.io_loop) + debugpy_stream = ZMQStream(self.debugpy_socket, self.control_thread.io_loop) self.control_thread.start() kernel_factory = self.kernel_class.instance kernel = kernel_factory(parent=self, session=self.session, control_stream=control_stream, + debugpy_stream=debugpy_stream, + debug_shell_socket=self.debug_shell_socket, shell_stream=shell_stream, iopub_thread=self.iopub_thread, iopub_socket=self.iopub_socket, diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 4d26a73a7..e12d893d0 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -12,6 +12,7 @@ import time import uuid import warnings +import asyncio try: # jupyter_client >= 5, use tz-aware now @@ -39,6 +40,7 @@ from ._version import kernel_protocol_version +from .debugger import Debugger class Kernel(SingletonConfigurable): @@ -69,6 +71,10 @@ def shell_streams(self): return [shell_stream] control_stream = Instance(ZMQStream, allow_none=True) + debugpy_stream = Instance(ZMQStream, allow_none=True) + + debug_shell_socket = Any() + iopub_socket = Any() iopub_thread = Any() stdin_socket = Any() @@ -160,7 +166,7 @@ def _default_ident(self): 'apply_request', ] # add deprecated ipyparallel control messages - control_msg_types = msg_types + ['clear_request', 'abort_request'] + control_msg_types = msg_types + ['clear_request', 'abort_request', 'debug_request'] def __init__(self, **kwargs): super(Kernel, self).__init__(**kwargs) @@ -173,6 +179,16 @@ def __init__(self, **kwargs): for msg_type in self.control_msg_types: self.control_handlers[msg_type] = getattr(self, msg_type) + self.debugger = Debugger(self.debugpy_stream, + self._publish_debug_event, + self.debug_shell_socket, + self.session) + + @gen.coroutine + def dispatch_debugpy(self, msg): + self.log.debug("Debugpy received: %s", msg) + selg.debugger.tcp_client.receive_dap_frame(msg) + @gen.coroutine def dispatch_control(self, msg): """dispatch control requests""" @@ -366,7 +382,12 @@ def dispatch_queue(self): Ensures that only one message is processing at a time, even when the handler is async """ + while True: + # ensure control stream is flushed before processing shell messages + if self.control_stream: + self.control_stream.flush() + # receive the next message and handle it try: yield self.process_one() except Exception: @@ -401,6 +422,7 @@ def start(self): self.io_loop.add_callback(self.dispatch_queue) self.control_stream.on_recv(self.dispatch_control, copy=False) + self.debugpy_stream.on_recv(self.dispatch_debugpy, copy=False) self.shell_stream.on_recv( partial( @@ -442,6 +464,13 @@ def _publish_status(self, status, channel, parent=None): parent=parent or self._parent_header[channel], ident=self._topic('status'), ) + def _publish_debug_event(self, event): + self.session.send(self.iopub_socket, + 'debug_event', + event, + parent=self._parent_header['control'], + ident=self._topic('debug_event') + ) def set_parent(self, ident, parent, channel='shell'): """Set the current parent_header @@ -519,6 +548,7 @@ def execute_request(self, stream, ident, parent): ) ) + self.log.debug("EXECUTE_REPLY: %s", reply_content) # Flush output before sending the reply. sys.stdout.flush() sys.stderr.flush() @@ -693,6 +723,20 @@ def do_is_complete(self, code): return {'status' : 'unknown', } + @gen.coroutine + def debug_request(self, stream, ident, parent): + content = parent['content'] + + reply_content = yield gen.maybe_future(self.do_debug_request(content)) + reply_content = json_clean(reply_content) + reply_msg = self.session.send(stream, 'debug_reply', reply_content, + parent, ident) + self.log.debug("%s", reply_msg) + + @gen.coroutine + def do_debug_request(self, msg): + return (yield self.debugger.process_request(msg)) + #--------------------------------------------------------------------------- # Engine methods (DEPRECATED) #--------------------------------------------------------------------------- diff --git a/ipykernel/kernelspec.py b/ipykernel/kernelspec.py index f37f98e19..deda6ebec 100644 --- a/ipykernel/kernelspec.py +++ b/ipykernel/kernelspec.py @@ -55,6 +55,7 @@ def get_kernel_dict(extra_arguments=None): 'argv': make_ipkernel_cmd(extra_arguments=extra_arguments), 'display_name': 'Python %i' % sys.version_info[0], 'language': 'python', + 'metadata': { 'debugger': True} } From b6731f338c229b879260cb53a00a91130632e242 Mon Sep 17 00:00:00 2001 From: Johan Mabille Date: Fri, 12 Feb 2021 16:11:57 +0100 Subject: [PATCH 03/14] Fixed starting sequence --- ipykernel/debugger.py | 115 ++++++++++++++++++++++++++-------------- ipykernel/kernelapp.py | 3 -- ipykernel/kernelbase.py | 10 ++-- 3 files changed, 80 insertions(+), 48 deletions(-) diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index 40c7a65f1..e55071a0f 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -1,24 +1,25 @@ import logging import os +import zmq from zmq.utils import jsonapi -#from traitlets import Instance -#from traitlets.config.configurable import SingletonConfigurable -from asyncio import (Event, Queue) +from tornado.queues import Queue +from tornado.locks import Event class DebugpyMessageQueue: - HEADER = 'Content Length: ' + HEADER = 'Content-Length: ' HEADER_LENGTH = 16 SEPARATOR = '\r\n\r\n' SEPARATOR_LENGTH = 4 - def __init__(self, event_callback): + def __init__(self, event_callback, log): self.tcp_buffer = '' self._reset_tcp_pos() self.event_callback = event_callback - self.message_queue = Queue + self.message_queue = Queue() + self.log = log def _reset_tcp_pos(self): self.header_pos = -1 @@ -27,42 +28,56 @@ def _reset_tcp_pos(self): self.message_pos = -1 def _put_message(self, raw_msg): + self.log.debug('QUEUE - _put_message:') msg = jsonapi.loads(raw_msg) - if mes['type'] == 'event': + if msg['type'] == 'event': + self.log.debug('QUEUE - received event:') + self.log.debug(msg) self.event_callback(msg) else: + self.log.debug('QUEUE - put message:') + self.log.debug(msg) self.message_queue.put_nowait(msg) def put_tcp_frame(self, frame): self.tcp_buffer += frame - # TODO: not sure this is required - #self.tcp_buffer += frame.decode("utf-8") + self.log.debug('QUEUE - received frame') # Finds header if self.header_pos == -1: - self.header_pos = self.tcp_buffer.find(DebugpyMessageQueue.HEADER, hint) + self.header_pos = self.tcp_buffer.find(DebugpyMessageQueue.HEADER) if self.header_pos == -1: return + self.log.debug('QUEUE - found header at pos %i', self.header_pos) + #Finds separator if self.separator_pos == -1: - hint = self.header_pos + DebugpyMessageQueue.HEADER_lenth + hint = self.header_pos + DebugpyMessageQueue.HEADER_LENGTH self.separator_pos = self.tcp_buffer.find(DebugpyMessageQueue.SEPARATOR, hint) if self.separator_pos == -1: return + self.log.debug('QUEUE - found separator at pos %i', self.separator_pos) + if self.message_pos == -1: - size_pos = self.header_pos + DebugpyMessageQueue.HEADER_lenth + size_pos = self.header_pos + DebugpyMessageQueue.HEADER_LENGTH self.message_pos = self.separator_pos + DebugpyMessageQueue.SEPARATOR_LENGTH - self.message_size = int(self.tcp_buf[size_pos:self.separator_pos]) + self.message_size = int(self.tcp_buffer[size_pos:self.separator_pos]) - if len(self.tcp_buffer - self.message_pos) < self.message_size: + self.log.debug('QUEUE - found message at pos %i', self.message_pos) + self.log.debug('QUEUE - message size is %i', self.message_size) + + if len(self.tcp_buffer) - self.message_pos < self.message_size: return - self._put_message(self.tcp_buf[self.message_pos:self.message_size]) - if len(self.tcp_buffer - self_message_pos) == self.message_size: - self.reset_buffer() + self._put_message(self.tcp_buffer[self.message_pos:self.message_pos + self.message_size]) + if len(self.tcp_buffer) - self.message_pos == self.message_size: + self.log.debug('QUEUE - resetting tcp_buffer') + self.tcp_buffer = '' + self._reset_tcp_pos() else: + self.log.debug('QUEUE - slicing tcp_buffer') self.tcp_buffer = self.tcp_buffer[self.message_pos + self.message_size:] self._reset_tcp_pos() @@ -72,29 +87,40 @@ async def get_message(self): class DebugpyClient: - def __init__(self, debugpy_stream, event_callback): + def __init__(self, log, debugpy_stream, event_callback): + self.log = log self.debugpy_stream = debugpy_stream + self.routing_id = None self.event_callback = event_callback - self.message_queue = DebugpyMessageQueue(self._forward_event) + self.message_queue = DebugpyMessageQueue(self._forward_event, self.log) self.wait_for_attach = True self.init_event = Event() + self.init_event_seq = -1 def _forward_event(self, msg): if msg['event'] == 'initialized': self.init_event.set() + self.init_event_seq = msg['seq'] self.event_callback(msg) def _send_request(self, msg): + if self.routing_id is None: + self.routing_id = self.debugpy_stream.socket.getsockopt(zmq.ROUTING_ID) content = jsonapi.dumps(msg) - content_length = len(content) - buf = DebugpyMessageQueue.HEADER + content_length + DebugpyMessageQueue.SEPARATOR + content_msg - self.debugpy_stream.send(buf) # TODO: pass routing_id + content_length = str(len(content)) + buf = (DebugpyMessageQueue.HEADER + content_length + DebugpyMessageQueue.SEPARATOR).encode('ascii') + buf += content + self.log.debug("DEBUGPYCLIENT:") + self.log.debug(self.routing_id) + self.log.debug(buf) + self.debugpy_stream.send_multipart((self.routing_id, buf)) + #self.debugpy_stream.send(buf) # TODO: pass routing_id - async def _wait_for_reponse(self): + async def _wait_for_response(self): # Since events are never pushed to the message_queue # we can safely assume the next message in queue # will be an answer to the previous request - return await self.message_queue.get() + return await self.message_queue.get_message() async def _handle_init_sequence(self): # 1] Waits for initialized event @@ -103,7 +129,7 @@ async def _handle_init_sequence(self): # 2] Sends configurationDone request configurationDone = { 'type': 'request', - 'seq': int(self.init_event_message['seq']) + 1, + 'seq': int(self.init_event_seq) + 1, 'command': 'configurationDone' } self._send_request(configurationDone) @@ -125,7 +151,9 @@ async def send_dap_request(self, msg): self.wait_for_attach = False return rep else: - rep = await self._wait_for_reponse() + rep = await self._wait_for_response() + self.log.debug('DEBUGPYCLIENT - returning:') + self.log.debug(rep) return rep class Debugger: @@ -143,10 +171,9 @@ class Debugger: 'debugInfo', 'inspectVariables' ] - #log = Instance(logging.Logger, allow_none=True) - - def __init__(self, debugpy_stream, event_callback, shell_socket, session): - self.debugpy_client = DebugpyClient(debugpy_stream, event_callback) + def __init__(self, log, debugpy_stream, event_callback, shell_socket, session): + self.log = log + self.debugpy_client = DebugpyClient(log, debugpy_stream, event_callback) self.shell_socket = shell_socket self.session = session self.is_started = False @@ -162,6 +189,9 @@ def __init__(self, debugpy_stream, event_callback, shell_socket, session): self.breakpoint_list = {} self.stopped_threads = [] + self.debugpy_host = '127.0.0.1' + self.debugpy_port = 0 + async def _forward_message(self, msg): return await self.debugpy_client.send_dap_request(msg) @@ -170,25 +200,30 @@ def tcp_client(self): return self.debugpy_client def start(self): - endpoint = self.debugpy_client.debugpy_stream.socket.getsockopt(zmq.LAST_ENDPOINT) - index = endpoit.rfind(':') - port = endpoint[index+1:] + socket = self.debugpy_client.debugpy_stream.socket + socket.bind_to_random_port('tcp://' + self.debugpy_host) + endpoint = socket.getsockopt(zmq.LAST_ENDPOINT).decode('utf-8') + socket.unbind(endpoint) + index = endpoint.rfind(':') + self.debugpy_port = endpoint[index+1:] code = 'import debugpy;' - code += 'debugpy.listen(("127.0.0.1",' + port + '))' + code += 'debugpy.listen(("' + self.debugpy_host + '",' + self.debugpy_port + '))' content = { 'code': code, - 'slient': True + 'silent': True } self.session.send(self.shell_socket, 'execute_request', content, None, (self.shell_socket.getsockopt(zmq.ROUTING_ID))) - return False + self.session.recv(self.shell_socket, mode=0) + socket.connect(endpoint) + return True def stop(self): # TODO pass - def dumpCell(self, message): + async def dumpCell(self, message): return {} async def setBreakpoints(self, message): @@ -196,7 +231,7 @@ async def setBreakpoints(self, message): self.breakpoint_list[source] = message['arguments']['breakpoints'] return await self._forward_message(message); - def source(self, message): + async def source(self, message): reply = { 'type': 'response', 'request_seq': message['seq'], @@ -268,11 +303,9 @@ async def debugInfo(self, message): 'stoppedThreads': self.stopped_threads } } - #self.log.info("returning reply %s", reply) - print("DEBUGGER: ", reply) return reply - def inspectVariables(self, message): + async def inspectVariables(self, message): # TODO return {} diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 86dfb098c..7bad05013 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -298,9 +298,6 @@ def init_control(self, context): self.debugpy_socket = context.socket(zmq.STREAM) self.debugpy_socket.linger = 1000 - self.debugpy_port = 0 - self.debugpy_port = self._bind_socket(self.debugpy_socket, self.debugpy_port) - self.log.debug("debugpy STREAM Channel on port: %i" % self.debugpy_port) self.debug_shell_socket = context.socket(zmq.DEALER) self.debug_shell_socket.linger = 1000 diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index e12d893d0..803987b05 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -179,15 +179,18 @@ def __init__(self, **kwargs): for msg_type in self.control_msg_types: self.control_handlers[msg_type] = getattr(self, msg_type) - self.debugger = Debugger(self.debugpy_stream, + self.debugger = Debugger(self.log, + self.debugpy_stream, self._publish_debug_event, self.debug_shell_socket, self.session) @gen.coroutine def dispatch_debugpy(self, msg): - self.log.debug("Debugpy received: %s", msg) - selg.debugger.tcp_client.receive_dap_frame(msg) + # The first frame is the socket id, we can drop it + frame = msg[1].bytes.decode('utf-8') + self.log.debug("Debugpy received: %s", frame) + self.debugger.tcp_client.receive_dap_frame(frame) @gen.coroutine def dispatch_control(self, msg): @@ -548,7 +551,6 @@ def execute_request(self, stream, ident, parent): ) ) - self.log.debug("EXECUTE_REPLY: %s", reply_content) # Flush output before sending the reply. sys.stdout.flush() sys.stderr.flush() From 95b5aeea424ec0ab1ccb73631e19125860280ae5 Mon Sep 17 00:00:00 2001 From: Johan Mabille Date: Fri, 19 Feb 2021 11:55:17 +0100 Subject: [PATCH 04/14] Implemented breakpoints --- ipykernel/compiler.py | 29 +++++++++++ ipykernel/control.py | 4 +- ipykernel/debugger.py | 106 +++++++++++++++++++++++++--------------- ipykernel/heartbeat.py | 2 + ipykernel/iostream.py | 2 + ipykernel/ipkernel.py | 3 ++ ipykernel/kernelapp.py | 1 + ipykernel/kernelbase.py | 15 ++++++ 8 files changed, 121 insertions(+), 41 deletions(-) create mode 100644 ipykernel/compiler.py diff --git a/ipykernel/compiler.py b/ipykernel/compiler.py new file mode 100644 index 000000000..fbb6cc41d --- /dev/null +++ b/ipykernel/compiler.py @@ -0,0 +1,29 @@ +from IPython.core.compilerop import CachingCompiler +import murmurhash.mrmr + +def get_tmp_directory(): + return '/tmp/ipykernel_debugger/' + +def get_tmp_hash_seed(): + hash_seed = 0xc70f6907 + return hash_seed + +def get_file_name(code): + name = murmurhash.mrmr.hash(code, seed = get_tmp_hash_seed(), murmur_version=2) + return get_tmp_directory() + str(name) + '.py' + +class XCachingCompiler(CachingCompiler): + + def __init__(self, *args, **kwargs): + super(XCachingCompiler, self).__init__(*args, **kwargs) + self.filename_mapper = None + self.log = None + + def get_code_name(self, raw_code, code, number): + filename = get_file_name(raw_code) + + if self.filename_mapper is not None: + self.filename_mapper(filename, number) + + return filename + diff --git a/ipykernel/control.py b/ipykernel/control.py index 10853046d..57a7ac727 100644 --- a/ipykernel/control.py +++ b/ipykernel/control.py @@ -9,9 +9,11 @@ class ControlThread(Thread): - def __init__(self, **kwargs): + def __init__(self, log=None, **kwargs): Thread.__init__(self, **kwargs) self.io_loop = IOLoop(make_current=False) + self.pydev_do_not_trace = True + self.is_pydev_daemon_thread = True def run(self): self.io_loop.make_current() diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index e55071a0f..efd8a66d3 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -7,6 +7,10 @@ from tornado.queues import Queue from tornado.locks import Event +from .compiler import (get_file_name, get_tmp_directory, get_tmp_hash_seed) + +import debugpy + class DebugpyMessageQueue: HEADER = 'Content-Length: ' @@ -43,43 +47,45 @@ def put_tcp_frame(self, frame): self.tcp_buffer += frame self.log.debug('QUEUE - received frame') - # Finds header - if self.header_pos == -1: - self.header_pos = self.tcp_buffer.find(DebugpyMessageQueue.HEADER) - if self.header_pos == -1: - return + while True: + # Finds header + if self.header_pos == -1: + self.header_pos = self.tcp_buffer.find(DebugpyMessageQueue.HEADER) + if self.header_pos == -1: + return - self.log.debug('QUEUE - found header at pos %i', self.header_pos) - - #Finds separator - if self.separator_pos == -1: - hint = self.header_pos + DebugpyMessageQueue.HEADER_LENGTH - self.separator_pos = self.tcp_buffer.find(DebugpyMessageQueue.SEPARATOR, hint) - if self.separator_pos == -1: - return - - self.log.debug('QUEUE - found separator at pos %i', self.separator_pos) - - if self.message_pos == -1: - size_pos = self.header_pos + DebugpyMessageQueue.HEADER_LENGTH - self.message_pos = self.separator_pos + DebugpyMessageQueue.SEPARATOR_LENGTH - self.message_size = int(self.tcp_buffer[size_pos:self.separator_pos]) - - self.log.debug('QUEUE - found message at pos %i', self.message_pos) - self.log.debug('QUEUE - message size is %i', self.message_size) - - if len(self.tcp_buffer) - self.message_pos < self.message_size: - return - - self._put_message(self.tcp_buffer[self.message_pos:self.message_pos + self.message_size]) - if len(self.tcp_buffer) - self.message_pos == self.message_size: - self.log.debug('QUEUE - resetting tcp_buffer') - self.tcp_buffer = '' - self._reset_tcp_pos() - else: - self.log.debug('QUEUE - slicing tcp_buffer') - self.tcp_buffer = self.tcp_buffer[self.message_pos + self.message_size:] - self._reset_tcp_pos() + self.log.debug('QUEUE - found header at pos %i', self.header_pos) + + #Finds separator + if self.separator_pos == -1: + hint = self.header_pos + DebugpyMessageQueue.HEADER_LENGTH + self.separator_pos = self.tcp_buffer.find(DebugpyMessageQueue.SEPARATOR, hint) + if self.separator_pos == -1: + return + + self.log.debug('QUEUE - found separator at pos %i', self.separator_pos) + + if self.message_pos == -1: + size_pos = self.header_pos + DebugpyMessageQueue.HEADER_LENGTH + self.message_pos = self.separator_pos + DebugpyMessageQueue.SEPARATOR_LENGTH + self.message_size = int(self.tcp_buffer[size_pos:self.separator_pos]) + + self.log.debug('QUEUE - found message at pos %i', self.message_pos) + self.log.debug('QUEUE - message size is %i', self.message_size) + + if len(self.tcp_buffer) - self.message_pos < self.message_size: + return + + self._put_message(self.tcp_buffer[self.message_pos:self.message_pos + self.message_size]) + if len(self.tcp_buffer) - self.message_pos == self.message_size: + self.log.debug('QUEUE - resetting tcp_buffer') + self.tcp_buffer = '' + self._reset_tcp_pos() + return + else: + self.tcp_buffer = self.tcp_buffer[self.message_pos + self.message_size:] + self.log.debug('QUEUE - slicing tcp_buffer: %s', self.tcp_buffer) + self._reset_tcp_pos() async def get_message(self): return await self.message_queue.get() @@ -217,6 +223,7 @@ def start(self): self.session.recv(self.shell_socket, mode=0) socket.connect(endpoint) + debugpy.trace_this_thread(False) return True def stop(self): @@ -224,7 +231,22 @@ def stop(self): pass async def dumpCell(self, message): - return {} + code = message['arguments']['code'] + file_name = get_file_name(code) + + with open(file_name, 'w') as f: + f.write(code) + + reply = { + 'type': 'response', + 'request_seq': message['seq'], + 'success': True, + 'command': message['command'], + 'body': { + 'sourcePath': file_name + } + } + return reply async def setBreakpoints(self, message): source = message['arguments']['source']['path']; @@ -244,7 +266,6 @@ async def source(self, message): reply['body'] = { 'content': f.read() } - else: reply['success'] = False reply['message'] = 'source unavailable' @@ -258,9 +279,14 @@ async def stackTrace(self, message): [frame for frame in reply['body']['stackFrames'] if frame['source']['path'] != ''] return reply + def accept_variable(self, variable): + return variable['type'] != 'list' and variable['type'] != 'ZMQExitAutocall' and variable['type'] != 'dict' + async def variables(self, message): reply = await self._forward_message(message) # TODO : check start and count arguments work as expected in debugpy + reply['body']['variables'] = \ + [var for var in reply['body']['variables'] if self.accept_variable(var)] return reply async def attach(self, message): @@ -296,8 +322,8 @@ async def debugInfo(self, message): 'body': { 'isStarted': self.is_started, 'hashMethod': 'Murmur2', - 'hashSeed': 0, - 'tmpFilePrefix': '/tmp/ipykernel_debugger', + 'hashSeed': get_tmp_hash_seed(), + 'tmpFilePrefix': get_tmp_directory(), 'tmpFileSuffix': '.py', 'breakpoints': breakpoint_list, 'stoppedThreads': self.stopped_threads diff --git a/ipykernel/heartbeat.py b/ipykernel/heartbeat.py index 01be21122..6d65a6c7e 100644 --- a/ipykernel/heartbeat.py +++ b/ipykernel/heartbeat.py @@ -40,6 +40,8 @@ def __init__(self, context, addr=None): self.pick_port() self.addr = (self.ip, self.port) self.daemon = True + self.pydev_do_not_trace = True + self.is_pydev_daemon_thread = True def pick_port(self): if self.transport == 'tcp': diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 486e80e27..cb3382604 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -71,6 +71,8 @@ def __init__(self, socket, pipe=False): self._setup_event_pipe() self.thread = threading.Thread(target=self._thread_main) self.thread.daemon = True + self.thread.pydev_do_not_trace = True + self.thread.is_pydev_daemon_thread = True def _thread_main(self): """The inner loop that's actually run in a thread""" diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index 62628971a..1939a54f0 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -19,6 +19,8 @@ from .zmqshell import ZMQInteractiveShell from .eventloops import _use_appnope +from .compiler import XCachingCompiler + try: from IPython.core.interactiveshell import _asyncio_runner except ImportError: @@ -71,6 +73,7 @@ def __init__(self, **kwargs): user_module = self.user_module, user_ns = self.user_ns, kernel = self, + compiler_class = XCachingCompiler, ) self.shell.displayhook.session = self.session self.shell.displayhook.pub_socket = self.iopub_socket diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 7bad05013..090e62e05 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -470,6 +470,7 @@ def init_kernel(self): debugpy_stream=debugpy_stream, debug_shell_socket=self.debug_shell_socket, shell_stream=shell_stream, + control_thread=self.control_thread, iopub_thread=self.iopub_thread, iopub_socket=self.iopub_socket, stdin_socket=self.stdin_socket, diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 803987b05..0643fcd7b 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -75,6 +75,7 @@ def shell_streams(self): debug_shell_socket = Any() + control_thread = Any() iopub_socket = Any() iopub_thread = Any() stdin_socket = Any() @@ -185,6 +186,10 @@ def __init__(self, **kwargs): self.debug_shell_socket, self.session) + self.control_queue = Queue() + kwargs['control_thread'].io_loop.add_callback(self.poll_control_queue) + + @gen.coroutine def dispatch_debugpy(self, msg): # The first frame is the socket id, we can drop it @@ -194,6 +199,16 @@ def dispatch_debugpy(self, msg): @gen.coroutine def dispatch_control(self, msg): + self.control_queue.put_nowait(msg) + + @gen.coroutine + def poll_control_queue(self): + while True: + msg = yield self.control_queue.get() + yield self.process_control(msg) + + @gen.coroutine + def process_control(self, msg): """dispatch control requests""" idents, msg = self.session.feed_identities(msg, copy=False) try: From 5ec9e348a13ac8e9ec1736bf33649e2c616062e4 Mon Sep 17 00:00:00 2001 From: Johan Mabille Date: Fri, 19 Feb 2021 23:19:23 +0100 Subject: [PATCH 05/14] Fixed negative hash numbers --- ipykernel/compiler.py | 2 ++ ipykernel/debugger.py | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/ipykernel/compiler.py b/ipykernel/compiler.py index fbb6cc41d..85fd68603 100644 --- a/ipykernel/compiler.py +++ b/ipykernel/compiler.py @@ -10,6 +10,8 @@ def get_tmp_hash_seed(): def get_file_name(code): name = murmurhash.mrmr.hash(code, seed = get_tmp_hash_seed(), murmur_version=2) + if name < 0: + name += 2**32 return get_tmp_directory() + str(name) + '.py' class XCachingCompiler(CachingCompiler): diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index efd8a66d3..900c10bf0 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -120,7 +120,6 @@ def _send_request(self, msg): self.log.debug(self.routing_id) self.log.debug(buf) self.debugpy_stream.send_multipart((self.routing_id, buf)) - #self.debugpy_stream.send(buf) # TODO: pass routing_id async def _wait_for_response(self): # Since events are never pushed to the message_queue From 72a9755e62b65e3b8b5a3b7b884cb8bf6e8cf5ef Mon Sep 17 00:00:00 2001 From: Johan Mabille Date: Mon, 22 Feb 2021 10:50:23 +0100 Subject: [PATCH 06/14] Handling stop and restart --- ipykernel/debugger.py | 74 +++++++++++++++++++++++++++++-------------- 1 file changed, 51 insertions(+), 23 deletions(-) diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index 900c10bf0..8acb65cbf 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -10,6 +10,7 @@ from .compiler import (get_file_name, get_tmp_directory, get_tmp_hash_seed) import debugpy +import time class DebugpyMessageQueue: @@ -96,13 +97,19 @@ class DebugpyClient: def __init__(self, log, debugpy_stream, event_callback): self.log = log self.debugpy_stream = debugpy_stream - self.routing_id = None self.event_callback = event_callback self.message_queue = DebugpyMessageQueue(self._forward_event, self.log) + self.debugpy_host = '127.0.0.1' + self.debugpy_port = -1 + self.routing_id = None self.wait_for_attach = True self.init_event = Event() self.init_event_seq = -1 + def _get_endpoint(self): + host, port = self.get_host_port() + return 'tcp://' + host + ':' + str(port) + def _forward_event(self, msg): if msg['event'] == 'initialized': self.init_event.set() @@ -146,6 +153,28 @@ async def _handle_init_sequence(self): attach_rep = await self._wait_for_response() return attach_rep + def get_host_port(self): + if self.debugpy_port == -1: + socket = self.debugpy_stream.socket + socket.bind_to_random_port('tcp://' + self.debugpy_host) + self.endpoint = socket.getsockopt(zmq.LAST_ENDPOINT).decode('utf-8') + socket.unbind(self.endpoint) + index = self.endpoint.rfind(':') + self.debugpy_port = self.endpoint[index+1:] + return self.debugpy_host, self.debugpy_port + + + def connect_tcp_socket(self): + self.debugpy_stream.socket.connect(self._get_endpoint()) + self.routing_id = self.debugpy_stream.socket.getsockopt(zmq.ROUTING_ID) + + def disconnect_tcp_socket(self): + self.debugpy_stream.socket.disconnect(self._get_endpoint()) + self.routing_id = None + self.init_event = Event() + self.init_event_seq = -1 + self.wait_for_attach = True + def receive_dap_frame(self, frame): self.message_queue.put_tcp_frame(frame) @@ -194,8 +223,11 @@ def __init__(self, log, debugpy_stream, event_callback, shell_socket, session): self.breakpoint_list = {} self.stopped_threads = [] + self.debugpy_initialized = False + self.debugpy_host = '127.0.0.1' self.debugpy_port = 0 + self.endpoint = None async def _forward_message(self, msg): return await self.debugpy_client.send_dap_request(msg) @@ -205,29 +237,24 @@ def tcp_client(self): return self.debugpy_client def start(self): - socket = self.debugpy_client.debugpy_stream.socket - socket.bind_to_random_port('tcp://' + self.debugpy_host) - endpoint = socket.getsockopt(zmq.LAST_ENDPOINT).decode('utf-8') - socket.unbind(endpoint) - index = endpoint.rfind(':') - self.debugpy_port = endpoint[index+1:] - code = 'import debugpy;' - code += 'debugpy.listen(("' + self.debugpy_host + '",' + self.debugpy_port + '))' - content = { - 'code': code, - 'silent': True - } - self.session.send(self.shell_socket, 'execute_request', content, - None, (self.shell_socket.getsockopt(zmq.ROUTING_ID))) + if not self.debugpy_initialized: + host, port = self.debugpy_client.get_host_port() + code = 'import debugpy;' + code += 'debugpy.listen(("' + host + '",' + port + '))' + content = { + 'code': code, + 'silent': True + } + self.session.send(self.shell_socket, 'execute_request', content, + None, (self.shell_socket.getsockopt(zmq.ROUTING_ID))) - self.session.recv(self.shell_socket, mode=0) - socket.connect(endpoint) - debugpy.trace_this_thread(False) - return True + ident, msg = self.session.recv(self.shell_socket, mode=0) + self.debugpy_initialized = msg['content']['status'] == 'ok' + self.debugpy_client.connect_tcp_socket() + return self.debugpy_initialized def stop(self): - # TODO - pass + self.debugpy_client.disconnect_tcp_socket() async def dumpCell(self, message): code = message['arguments']['code'] @@ -289,9 +316,10 @@ async def variables(self, message): return reply async def attach(self, message): + host, port = self.debugpy_client.get_host_port() message['arguments']['connect'] = { - 'host': self.debugpy_host, - 'port': self.debugpy_port + 'host': host, + 'port': port } message['arguments']['logToFile'] = True return await self._forward_message(message) From adc8079ce2232328a207cd858501b7d6c984ded5 Mon Sep 17 00:00:00 2001 From: Johan Mabille Date: Tue, 23 Feb 2021 02:09:24 +0100 Subject: [PATCH 07/14] Create tmporary directory for dumpCell request --- ipykernel/compiler.py | 8 ++++++-- ipykernel/debugger.py | 7 ++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/ipykernel/compiler.py b/ipykernel/compiler.py index 85fd68603..44456b8b7 100644 --- a/ipykernel/compiler.py +++ b/ipykernel/compiler.py @@ -1,8 +1,12 @@ from IPython.core.compilerop import CachingCompiler import murmurhash.mrmr +import tempfile +import os def get_tmp_directory(): - return '/tmp/ipykernel_debugger/' + tmp_dir = tempfile.gettempdir() + pid = os.getpid() + return tmp_dir + '/ipykernel_' + str(pid) def get_tmp_hash_seed(): hash_seed = 0xc70f6907 @@ -12,7 +16,7 @@ def get_file_name(code): name = murmurhash.mrmr.hash(code, seed = get_tmp_hash_seed(), murmur_version=2) if name < 0: name += 2**32 - return get_tmp_directory() + str(name) + '.py' + return get_tmp_directory() + '/' + str(name) + '.py' class XCachingCompiler(CachingCompiler): diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index 8acb65cbf..3ad8fd94c 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -10,7 +10,6 @@ from .compiler import (get_file_name, get_tmp_directory, get_tmp_hash_seed) import debugpy -import time class DebugpyMessageQueue: @@ -163,7 +162,6 @@ def get_host_port(self): self.debugpy_port = self.endpoint[index+1:] return self.debugpy_host, self.debugpy_port - def connect_tcp_socket(self): self.debugpy_stream.socket.connect(self._get_endpoint()) self.routing_id = self.debugpy_stream.socket.getsockopt(zmq.ROUTING_ID) @@ -238,6 +236,9 @@ def tcp_client(self): def start(self): if not self.debugpy_initialized: + tmp_dir = get_tmp_directory() + if not os.path.exists(tmp_dir): + os.makedirs(tmp_dir) host, port = self.debugpy_client.get_host_port() code = 'import debugpy;' code += 'debugpy.listen(("' + host + '",' + port + '))' @@ -350,7 +351,7 @@ async def debugInfo(self, message): 'isStarted': self.is_started, 'hashMethod': 'Murmur2', 'hashSeed': get_tmp_hash_seed(), - 'tmpFilePrefix': get_tmp_directory(), + 'tmpFilePrefix': get_tmp_directory() + '/', 'tmpFileSuffix': '.py', 'breakpoints': breakpoint_list, 'stoppedThreads': self.stopped_threads From 1b6f09a19ebc2f21d5014a6197bcd97f8bb62f24 Mon Sep 17 00:00:00 2001 From: Johan Mabille Date: Tue, 23 Feb 2021 09:35:58 +0100 Subject: [PATCH 08/14] Added dependencies in setup.py --- setup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 106eceaa6..e96636919 100644 --- a/setup.py +++ b/setup.py @@ -89,11 +89,13 @@ def run(self): keywords=['Interactive', 'Interpreter', 'Shell', 'Web'], python_requires='>=3.5', install_requires=[ - 'ipython>=5.0.0', + 'debugpy>=1.0.0', + 'ipython>=7.21.0', 'traitlets>=4.1.0', 'jupyter_client', 'tornado>=4.2', 'appnope;platform_system=="Darwin"', + #'murmurhash>=1.0.0' requires https://github.com/explosion/murmurhash/pull/24 ], extras_require={ 'test': [ From 56f618f38d25a669169cdacd34a20d2ab4b5691f Mon Sep 17 00:00:00 2001 From: Johan Mabille Date: Mon, 1 Mar 2021 13:42:15 +0100 Subject: [PATCH 09/14] Local implementation of murmur2 hash --- ipykernel/compiler.py | 37 +++++++++++++++++++++++++++++++++---- setup.py | 1 - 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/ipykernel/compiler.py b/ipykernel/compiler.py index 44456b8b7..4c724f146 100644 --- a/ipykernel/compiler.py +++ b/ipykernel/compiler.py @@ -1,8 +1,39 @@ from IPython.core.compilerop import CachingCompiler -import murmurhash.mrmr import tempfile import os +def murmur2_x86(data, seed): + m = 0x5bd1e995 + length = len(data) + h = seed ^ length + rounded_end = (length & 0xfffffffc) + for i in range(0, rounded_end, 4): + k = (ord(data[i]) & 0xff) | ((ord(data[i + 1]) & 0xff) << 8) | \ + ((ord(data[i + 2]) & 0xff) << 16) | (ord(data[i + 3]) << 24) + k = (k * m) & 0xffffffff + k ^= k >> 24 + k = (k * m) & 0xffffffff + + h = (h * m) & 0xffffffff + h ^= k + + val = length & 0x03 + k = 0 + if val == 3: + k = (ord(data[rounded_end + 2]) & 0xff) << 16 + if val in [2, 3]: + k |= (ord(data[rounded_end + 1]) & 0xff) << 8 + if val in [1, 2, 3]: + k |= ord(data[rounded_end]) & 0xff + h ^= k + h = (h * m) & 0xffffffff + + h ^= h >> 13 + h = (h * m) & 0xffffffff + h ^= h >> 15 + + return h + def get_tmp_directory(): tmp_dir = tempfile.gettempdir() pid = os.getpid() @@ -13,9 +44,7 @@ def get_tmp_hash_seed(): return hash_seed def get_file_name(code): - name = murmurhash.mrmr.hash(code, seed = get_tmp_hash_seed(), murmur_version=2) - if name < 0: - name += 2**32 + name = murmur2_x86(code, get_tmp_hash_seed()) return get_tmp_directory() + '/' + str(name) + '.py' class XCachingCompiler(CachingCompiler): diff --git a/setup.py b/setup.py index e96636919..927152151 100644 --- a/setup.py +++ b/setup.py @@ -95,7 +95,6 @@ def run(self): 'jupyter_client', 'tornado>=4.2', 'appnope;platform_system=="Darwin"', - #'murmurhash>=1.0.0' requires https://github.com/explosion/murmurhash/pull/24 ], extras_require={ 'test': [ From 0181d70a18f4b01520c1aba04b2bd4a65c1c7c67 Mon Sep 17 00:00:00 2001 From: Johan Mabille Date: Mon, 1 Mar 2021 14:04:23 +0100 Subject: [PATCH 10/14] Dropped support for Python 3.6 --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7c4898c6f..d4e6425e8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,7 @@ jobs: fail-fast: false matrix: os: [ubuntu, macos, windows] - python-version: [ '3.6', '3.7', '3.8', '3.9', 'pypy3' ] + python-version: [ '3.7', '3.8', '3.9' ] exclude: - os: windows python-version: pypy3 From 5b4f173cd320366206c72e2d3c5acfb267a33161 Mon Sep 17 00:00:00 2001 From: Johan Mabille Date: Fri, 5 Mar 2021 10:30:52 +0100 Subject: [PATCH 11/14] Removed ipykernel stack frames from the callstack --- ipykernel/debugger.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index 3ad8fd94c..8871bdceb 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -302,8 +302,17 @@ async def source(self, message): async def stackTrace(self, message): reply = await self._forward_message(message) - reply['body']['stackFrames'] = \ - [frame for frame in reply['body']['stackFrames'] if frame['source']['path'] != ''] + # We stackFrames array has the following content: + # { frames from the notebook} + # ... + # { 'id': xxx, 'name': '', ... } <= this is the first frame of the code from the notebook + # { frame from ipykernel } + # ... + # {'id': yyy, 'name': '', ... } <= this is the first frame of ipykernel code + # We want to remove all the frames from ipykernel + sf_list = reply['body']['stackFrames'] + module_idx = len(sf_list) - next(i for i, v in enumerate(reversed(sf_list), 1) if v['name'] == '' and i != 1) + reply['body']['stackFrames'] = reply['body']['stackFrames'][:module_idx+1] return reply def accept_variable(self, variable): From 8210c110d5e94a1adc8cb1526754cefddc54484f Mon Sep 17 00:00:00 2001 From: Johan Mabille Date: Fri, 5 Mar 2021 15:07:40 +0100 Subject: [PATCH 12/14] Add and remove threads Id upon debugger events --- ipykernel/debugger.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index 8871bdceb..b40dc4069 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -205,10 +205,11 @@ class Debugger: def __init__(self, log, debugpy_stream, event_callback, shell_socket, session): self.log = log - self.debugpy_client = DebugpyClient(log, debugpy_stream, event_callback) + self.debugpy_client = DebugpyClient(log, debugpy_stream, self._handle_event) self.shell_socket = shell_socket self.session = session self.is_started = False + self.event_callback = event_callback self.started_debug_handlers = {} for msg_type in Debugger.started_debug_msg_types: @@ -227,6 +228,13 @@ def __init__(self, log, debugpy_stream, event_callback, shell_socket, session): self.debugpy_port = 0 self.endpoint = None + def _handle_event(self, msg): + if msg['event'] == 'stopped': + self.stopped_threads.append(msg['body']['threadId']) + elif msg['event'] == 'continued': + self.stopped_threads.remove(msg['body']['threadId']) + self.event_callback(msg) + async def _forward_message(self, msg): return await self.debugpy_client.send_dap_request(msg) @@ -302,11 +310,11 @@ async def source(self, message): async def stackTrace(self, message): reply = await self._forward_message(message) - # We stackFrames array has the following content: + # The stackFrames array has the following content: # { frames from the notebook} # ... # { 'id': xxx, 'name': '', ... } <= this is the first frame of the code from the notebook - # { frame from ipykernel } + # { frames from ipykernel } # ... # {'id': yyy, 'name': '', ... } <= this is the first frame of ipykernel code # We want to remove all the frames from ipykernel From 19d189d225273d4c46fa523d2bd08e0415493d26 Mon Sep 17 00:00:00 2001 From: Johan Mabille Date: Fri, 5 Mar 2021 15:46:34 +0100 Subject: [PATCH 13/14] Filtered out irrelevant variables --- ipykernel/debugger.py | 10 ++++++++-- ipykernel/ipkernel.py | 3 ++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index b40dc4069..e2bc8fc49 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -232,7 +232,10 @@ def _handle_event(self, msg): if msg['event'] == 'stopped': self.stopped_threads.append(msg['body']['threadId']) elif msg['event'] == 'continued': - self.stopped_threads.remove(msg['body']['threadId']) + try: + self.stopped_threads.remove(msg['body']['threadId']) + except: + pass self.event_callback(msg) async def _forward_message(self, msg): @@ -324,7 +327,10 @@ async def stackTrace(self, message): return reply def accept_variable(self, variable): - return variable['type'] != 'list' and variable['type'] != 'ZMQExitAutocall' and variable['type'] != 'dict' + cond = variable['type'] != 'list' and variable['type'] != 'ZMQExitAutocall' and variable['type'] != 'dict' + cond = cond and variable['name'] not in ['debugpy', 'get_ipython', '_'] + cond = cond and variable['name'][0:2] != '_i' + return cond async def variables(self, message): reply = await self._forward_message(message) diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index 1939a54f0..23c06f034 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -153,7 +153,8 @@ def set_parent(self, ident, parent, channel='shell'): about the parent message. """ super(IPythonKernel, self).set_parent(ident, parent, channel) - self.shell.set_parent(parent) + if channel == 'shell': + self.shell.set_parent(parent) def init_metadata(self, parent): """Initialize metadata. From 137112e2daeb9eb00e400342853cf0f8402fb138 Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Sun, 7 Mar 2021 17:35:48 +0100 Subject: [PATCH 14/14] Fix tests --- ipykernel/control.py | 2 +- ipykernel/kernelapp.py | 8 ++++++-- ipykernel/kernelbase.py | 4 ++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/ipykernel/control.py b/ipykernel/control.py index 57a7ac727..7ac56b7f9 100644 --- a/ipykernel/control.py +++ b/ipykernel/control.py @@ -9,7 +9,7 @@ class ControlThread(Thread): - def __init__(self, log=None, **kwargs): + def __init__(self, **kwargs): Thread.__init__(self, **kwargs) self.io_loop = IOLoop(make_current=False) self.pydev_do_not_trace = True diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 090e62e05..7bf4a54ba 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -301,7 +301,8 @@ def init_control(self, context): self.debug_shell_socket = context.socket(zmq.DEALER) self.debug_shell_socket.linger = 1000 - self.debug_shell_socket.connect(self.shell_socket.getsockopt(zmq.LAST_ENDPOINT)) + if self.shell_socket.getsockopt(zmq.LAST_ENDPOINT): + self.debug_shell_socket.connect(self.shell_socket.getsockopt(zmq.LAST_ENDPOINT)) if hasattr(zmq, 'ROUTER_HANDOVER'): # set router-handover to workaround zeromq reconnect problems @@ -345,7 +346,10 @@ def close(self): self.iopub_thread.stop() self.iopub_thread.close() - self.debug_shell_socket.close() + if self.debugpy_socket and not self.debugpy_socket.closed: + self.debugpy_socket.close() + if self.debug_shell_socket and not self.debug_shell_socket.closed: + self.debug_shell_socket.close() for channel in ('shell', 'control', 'stdin'): self.log.debug("Closing %s channel", channel) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 0643fcd7b..7b2917d23 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -187,8 +187,8 @@ def __init__(self, **kwargs): self.session) self.control_queue = Queue() - kwargs['control_thread'].io_loop.add_callback(self.poll_control_queue) - + if 'control_thread' in kwargs: + kwargs['control_thread'].io_loop.add_callback(self.poll_control_queue) @gen.coroutine def dispatch_debugpy(self, msg):