Skip to content
2 changes: 1 addition & 1 deletion ipykernel/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version_info = (5, 6, 0, 'dev0')
version_info = (6, 0, 0, 'dev0')
__version__ = '.'.join(map(str, version_info[:3]))

# pep440 is annoying, beta/alpha/rc should _not_ have dots or pip/setuptools
Expand Down
19 changes: 19 additions & 0 deletions ipykernel/control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from threading import Thread
import zmq
if zmq.pyzmq_version_info() >= (17, 0):
from tornado.ioloop import IOLoop
else:
# deprecated since pyzmq 17
from zmq.eventloop.ioloop import IOLoop


class ControlThread(Thread):

def __init__(self, **kwargs):
Thread.__init__(self, **kwargs)
self.io_loop = IOLoop(make_current=False)

def run(self):
self.io_loop.make_current()
self.io_loop.start()
self.io_loop.close(all_fds=True)
41 changes: 16 additions & 25 deletions ipykernel/eventloops.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,7 @@ def loop_qt4(kernel):

kernel.app = get_app_qt4([" "])
kernel.app.setQuitOnLastWindowClosed(False)

# Only register the eventloop for the shell stream because doing
# it for the control stream is generating a bunch of unnecessary
# warnings on Windows.
_notify_stream_qt(kernel, kernel.shell_streams[0])
_notify_stream_qt(kernel, kernel.shell_stream)

_loop_qt(kernel.app)

Expand Down Expand Up @@ -160,10 +156,9 @@ def loop_wx(kernel):

def wake():
"""wake from wx"""
for stream in kernel.shell_streams:
if stream.flush(limit=1):
kernel.app.ExitMainLoop()
return
if kernel.shell_stream.flush(limit=1):
kernel.app.ExitMainLoop()
return

# We have to put the wx.Timer in a wx.Frame for it to fire properly.
# We make the Frame hidden when we create it in the main app below.
Expand Down Expand Up @@ -237,13 +232,12 @@ def process_stream_events(stream, *a, **kw):
# For Tkinter, we create a Tk object and call its withdraw method.
kernel.app_wrapper = BasicAppWrapper(app)

for stream in kernel.shell_streams:
notifier = partial(process_stream_events, stream)
# seems to be needed for tk
notifier.__name__ = "notifier"
app.tk.createfilehandler(stream.getsockopt(zmq.FD), READABLE, notifier)
# schedule initial call after start
app.after(0, notifier)
notifier = partial(process_stream_events, shell_stream)
# seems to be needed for tk
notifier.__name__ = "notifier"
app.tk.createfilehandler(shell_stream.getsockopt(zmq.FD), READABLE, notifier)
# schedule initial call after start
app.after(0, notifier)

app.mainloop()

Expand Down Expand Up @@ -330,10 +324,9 @@ def handle_int(etype, value, tb):
# don't let interrupts during mainloop invoke crash_handler:
sys.excepthook = handle_int
mainloop(kernel._poll_interval)
for stream in kernel.shell_streams:
if stream.flush(limit=1):
# events to process, return control to kernel
return
if kernel_shell_stream.flush(limit=1):
# events to process, return control to kernel
return
except:
raise
except KeyboardInterrupt:
Expand Down Expand Up @@ -371,11 +364,9 @@ def process_stream_events(stream):
if stream.flush(limit=1):
loop.stop()

for stream in kernel.shell_streams:
fd = stream.getsockopt(zmq.FD)
notifier = partial(process_stream_events, stream)
loop.add_reader(fd, notifier)
loop.call_soon(notifier)
notifier = partial(process_stream_events, shell_stream)
loop.add_reader(shell_stream.getsockopt(zmq.FD), notifier)
loop.call_soon(notifier)

while True:
error = None
Expand Down
5 changes: 2 additions & 3 deletions ipykernel/inprocess/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#-----------------------------------------------------------------------------

# IPython imports
from ipykernel.inprocess.socket import DummySocket
from traitlets import Type, Instance, default
from jupyter_client.clientabc import KernelClientABC
from jupyter_client.client import KernelClient
Expand Down Expand Up @@ -171,10 +170,10 @@ def _dispatch_to_kernel(self, msg):
if kernel is None:
raise RuntimeError('Cannot send request. No kernel exists.')

stream = DummySocket()
stream = kernel.shell_stream
self.session.send(stream, msg)
msg_parts = stream.recv_multipart()
kernel.dispatch_shell(stream, msg_parts)
kernel.dispatch_shell(msg_parts)

idents, reply_msg = self.session.recv(stream, copy=False)
self.shell_channel.call_handlers_later(reply_msg)
Expand Down
4 changes: 2 additions & 2 deletions ipykernel/inprocess/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ class InProcessKernel(IPythonKernel):
#-------------------------------------------------------------------------

shell_class = Type(allow_none=True)
shell_streams = List()
control_stream = Any()
_underlying_iopub_socket = Instance(DummySocket, ())
iopub_thread = Instance(IOPubThread)

shell_stream = Instance(DummySocket, ())

@default('iopub_thread')
def _default_iopub_thread(self):
thread = IOPubThread(self._underlying_iopub_socket)
Expand Down
12 changes: 10 additions & 2 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import sys
import threading
import warnings
from weakref import WeakSet
from io import StringIO, TextIOBase

import zmq
Expand Down Expand Up @@ -66,6 +67,7 @@ def __init__(self, socket, pipe=False):
self._setup_pipe_in()
self._local = threading.local()
self._events = deque()
self._event_pipes = WeakSet()
self._setup_event_pipe()
self.thread = threading.Thread(target=self._thread_main)
self.thread.daemon = True
Expand Down Expand Up @@ -100,6 +102,9 @@ def _event_pipe(self):
event_pipe.linger = 0
event_pipe.connect(self._event_interface)
self._local.event_pipe = event_pipe
# WeakSet so that event pipes will be closed by garbage collection
# when their threads are terminated
self._event_pipes.add(event_pipe)
return event_pipe

def _handle_event(self, msg):
Expand Down Expand Up @@ -179,8 +184,11 @@ def stop(self):
return
self.io_loop.add_callback(self.io_loop.stop)
self.thread.join()
if hasattr(self._local, 'event_pipe'):
self._local.event_pipe.close()
# close *all* event pipes, created in any thread
# event pipes can only be used from other threads while self.thread.is_alive()
# so after thread.join, this should be safe
for event_pipe in self._event_pipes:
event_pipe.close()

def close(self):
if self.closed:
Expand Down
6 changes: 3 additions & 3 deletions ipykernel/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ def start(self):
self.shell.exit_now = False
super(IPythonKernel, self).start()

def set_parent(self, ident, parent):
def set_parent(self, ident, parent, channel='shell'):
"""Overridden from parent to tell the display hook and output streams
about the parent message.
"""
super(IPythonKernel, self).set_parent(ident, parent)
super(IPythonKernel, self).set_parent(ident, parent, channel)
self.shell.set_parent(parent)

def init_metadata(self, parent):
Expand Down Expand Up @@ -509,7 +509,7 @@ def do_apply(self, content, bufs, msg_id, reply_metadata):
reply_content['engine_info'] = e_info

self.send_response(self.iopub_socket, 'error', reply_content,
ident=self._topic('error'))
ident=self._topic('error'), channel='shell')
self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
result_buf = []
reply_content['status'] = 'error'
Expand Down
25 changes: 18 additions & 7 deletions ipykernel/kernelapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

# local imports
from .iostream import IOPubThread
from .control import ControlThread
from .heartbeat import Heartbeat
from .ipkernel import IPythonKernel
from .parentpoller import ParentPollerUnix, ParentPollerWindows
Expand Down Expand Up @@ -124,6 +125,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp,
stdin_socket = Any()
iopub_socket = Any()
iopub_thread = Any()
control_thread = Any()

ports = Dict()

Expand Down Expand Up @@ -276,6 +278,17 @@ def init_sockets(self):
self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
self.log.debug("stdin ROUTER Channel on port: %i" % self.stdin_port)

if hasattr(zmq, 'ROUTER_HANDOVER'):
# set router-handover to workaround zeromq reconnect problems
# in certain rare circumstances
# see ipython/ipykernel#270 and zeromq/libzmq#2892
self.shell_socket.router_handover = \
self.stdin_socket.router_handover = 1

self.init_control(context)
self.init_iopub(context)

def init_control(self, context):
self.control_socket = context.socket(zmq.ROUTER)
self.control_socket.linger = 1000
self.control_port = self._bind_socket(self.control_socket, self.control_port)
Expand All @@ -285,11 +298,9 @@ def init_sockets(self):
# set router-handover to workaround zeromq reconnect problems
# in certain rare circumstances
# see ipython/ipykernel#270 and zeromq/libzmq#2892
self.shell_socket.router_handover = \
self.control_socket.router_handover = \
self.stdin_socket.router_handover = 1
self.control_socket.router_handover = 1

self.init_iopub(context)
self.control_thread = ControlThread(daemon=True)

def init_iopub(self, context):
self.iopub_socket = context.socket(zmq.PUB)
Expand Down Expand Up @@ -437,13 +448,13 @@ def init_signal(self):
def init_kernel(self):
"""Create the Kernel object itself"""
shell_stream = ZMQStream(self.shell_socket)
control_stream = ZMQStream(self.control_socket)

control_stream = ZMQStream(self.control_socket, self.control_thread.io_loop)
self.control_thread.start()
kernel_factory = self.kernel_class.instance

kernel = kernel_factory(parent=self, session=self.session,
control_stream=control_stream,
shell_streams=[shell_stream, control_stream],
shell_stream=shell_stream,
iopub_thread=self.iopub_thread,
iopub_socket=self.iopub_socket,
stdin_socket=self.stdin_socket,
Expand Down
Loading