Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions qcodes/data/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from queue import Empty
from traceback import format_exc

from qcodes.utils.multiprocessing import ServerManager
from qcodes.utils.multiprocessing import ServerManager, SERVER_ERR


def get_data_manager(only_existing=False):
Expand Down Expand Up @@ -42,7 +42,7 @@ class DataManager(ServerManager):
Written using multiprocessing Queue's, but should be easily
extensible to other messaging systems
'''
def __init__(self, query_timeout=2):
def __init__(self):
type(self).default = self
super().__init__(name='DataServer', server_class=DataServer)

Expand Down Expand Up @@ -122,7 +122,9 @@ def _reply(self, response):

def _post_error(self, e):
self._error_queue.put(format_exc())
self._response_queue.put('ERR') # to short-circuit the timeout
# the caller is waiting on _response_queue, so put a signal there
# to say there's an error coming
self._response_queue.put(SERVER_ERR)

######################################################################
# query handlers #
Expand Down
9 changes: 4 additions & 5 deletions qcodes/instrument/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,7 @@ class InstrumentConnection:
def __init__(self, manager, instrument_class, new_id, args, kwargs):
self.manager = manager

# long timeout on the initial call, to allow slow errors
# (like visa timeout) to get back to us
info = manager.ask('new', instrument_class, new_id, args, kwargs,
timeout=20)
info = manager.ask('new', instrument_class, new_id, args, kwargs)
for k, v in info.items():
setattr(self, k, v)

Expand Down Expand Up @@ -190,7 +187,9 @@ def post_error(self, e, query=None):
if query:
e.args = e.args + ('error processing query ' + repr(query),)
self._error_queue.put(format_exc())
self._response_queue.put(SERVER_ERR) # to short-circuit timeout
# the caller is waiting on _response_queue, so put a signal there
# to say there's an error coming
self._response_queue.put(SERVER_ERR)

def handle_halt(self, *args, **kwargs):
'''
Expand Down
3 changes: 2 additions & 1 deletion qcodes/tests/test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ def test_mechanics(self):
' OSError: your hard disk went floppy.')
sm._error_queue.put(builtin_error_str)
sm._response_queue.put(SERVER_ERR)
time.sleep(0.005)
while sm._error_queue.empty() or sm._response_queue.empty():
time.sleep(0.005)
with self.assertRaises(OSError):
sm.ask('which way does the wind blow?')

Expand Down
23 changes: 15 additions & 8 deletions qcodes/utils/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,11 @@ class ServerManager:

name: the name of the server. Can include .format specs to insert
all or part of the uuid
query_timeout: the default time to wait for responses
query_timeout: (default None) the default time to wait for responses
kwargs: passed along to the server constructor
'''
def __init__(self, name, server_class, shared_attrs=None, query_timeout=2):
def __init__(self, name, server_class, shared_attrs=None,
query_timeout=None):
self._query_queue = mp.Queue()
self._response_queue = mp.Queue()
self._error_queue = mp.Queue()
Expand Down Expand Up @@ -276,7 +277,7 @@ def write(self, *query):
self._query_queue.put(query)
self._check_for_errors()

def _check_for_errors(self, expect_error=False):
def _check_for_errors(self, expect_error=False, query=None):
if expect_error or not self._error_queue.empty():
# clear the response queue whenever there's an error
# and give it a little time to flush first
Expand All @@ -297,12 +298,18 @@ def _check_for_errors(self, expect_error=False):
if err_type is None or not issubclass(err_type, Exception):
err_type = RuntimeError

if query:
errhead += '\nwhile executing query: ' + repr(query)

raise err_type(errhead + '\n\n' + errstr)

def _check_response(self, timeout):
def _check_response(self, timeout, query=None):
res = self._response_queue.get(timeout=timeout)
if res == SERVER_ERR:
self._expect_error = True
# TODO: I think the way we're doing this now, I could get rid of
# _error_queue completely and just have errors and regular
# responses labeled differently in _response_queue
self._check_for_errors(expect_error=True, query=query)
return res

def ask(self, *query, timeout=None):
Expand All @@ -323,16 +330,16 @@ def ask(self, *query, timeout=None):
self._query_queue.put(query)

try:
res = self._check_response(timeout)
res = self._check_response(timeout, query)

while not self._response_queue.empty():
res = self._check_response(timeout)
res = self._check_response(timeout, query)

except Empty as e:
if self._error_queue.empty():
# only raise if we're not about to find a deeper error
raise e
self._check_for_errors(self._expect_error)
self._check_for_errors(query=query)

return res

Expand Down