From ebee611c1fb6d84af8964e19e98b1b589de3d56d Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Mon, 19 May 2025 18:30:09 -0700 Subject: [PATCH 1/2] remove LH callbacks --- pylabrobot/liquid_handling/liquid_handler.py | 167 +++---------------- 1 file changed, 24 insertions(+), 143 deletions(-) diff --git a/pylabrobot/liquid_handling/liquid_handler.py b/pylabrobot/liquid_handling/liquid_handler.py index 7179f72f542..eb7c5a0c8a1 100644 --- a/pylabrobot/liquid_handling/liquid_handler.py +++ b/pylabrobot/liquid_handling/liquid_handler.py @@ -20,6 +20,7 @@ Sequence, Set, Tuple, + Type, Union, cast, ) @@ -61,6 +62,7 @@ from pylabrobot.resources.liquid import Liquid from pylabrobot.resources.rotation import Rotation from pylabrobot.tilting.tilter import Tilter +from pylabrobot.error_handling import handles_errors from .backends import LiquidHandlerBackend from .standard import ( @@ -111,18 +113,6 @@ class LiquidHandler(Resource, Machine): defined in `pyhamilton.liquid_handling.backends`) to communicate with the liquid handler. """ - ALLOWED_CALLBACKS = { - "aspirate", - "aspirate96", - "dispense", - "dispense96", - "drop_tips", - "drop_tips96", - "move_resource", - "pick_up_tips", - "pick_up_tips96", - } - def __init__(self, backend: LiquidHandlerBackend, deck: Deck): """Initialize a LiquidHandler. @@ -142,7 +132,6 @@ def __init__(self, backend: LiquidHandlerBackend, deck: Deck): Machine.__init__(self, backend=backend) self.backend: LiquidHandlerBackend = backend # fix type - self._callbacks: Dict[str, OperationCallback] = {} self.deck = deck # register callbacks for sending resource assignment/unassignment to backend @@ -161,6 +150,8 @@ def __init__(self, backend: LiquidHandlerBackend, deck: Deck): self._resource_pickup: Optional[ResourcePickup] = None + self._error_handlers: Dict[Type[Exception], Callable] = {} + async def setup(self, **backend_kwargs): """Prepare the robot for use.""" @@ -347,7 +338,8 @@ def _make_sure_channels_exist(self, channels: List[int]): if not len(invalid_channels) == 0: raise ValueError(f"Invalid channels: {invalid_channels}") - @need_setup_finished + @handles_errors + # @need_setup_finished async def pick_up_tips( self, tip_spots: List[TipSpot], @@ -463,15 +455,8 @@ async def pick_up_tips( (op.resource.tracker.commit if success else op.resource.tracker.rollback)() (self.head[channel].commit if success else self.head[channel].rollback)() - # trigger callback - self._trigger_callback( - "pick_up_tips", - liquid_handler=self, - operations=pickups, - use_channels=use_channels, - error=error, - **backend_kwargs, - ) + if error is not None: + raise error @need_setup_finished async def drop_tips( @@ -599,15 +584,8 @@ async def drop_tips( (op.resource.tracker.commit if success else op.resource.tracker.rollback)() (self.head[channel].commit if success else self.head[channel].rollback)() - # trigger callback - self._trigger_callback( - "drop_tips", - liquid_handler=self, - operations=drops, - use_channels=use_channels, - error=error, - **backend_kwargs, - ) + if error is not None: + raise error async def return_tips( self, @@ -913,15 +891,8 @@ async def aspirate( tip_volume_tracker = self.head[channel].get_tip().tracker (tip_volume_tracker.commit if success else tip_volume_tracker.rollback)() - # trigger callback - self._trigger_callback( - "aspirate", - liquid_handler=self, - operations=aspirations, - use_channels=use_channels, - error=error, - **backend_kwargs, - ) + if error is not None: + raise error @need_setup_finished async def dispense( @@ -1123,15 +1094,8 @@ async def dispense( if any(bav is not None for bav in blow_out_air_volume): self._blow_out_air_volume = None - # trigger callback - self._trigger_callback( - "dispense", - liquid_handler=self, - operations=dispenses, - use_channels=use_channels, - error=error, - **backend_kwargs, - ) + if error is not None: + raise error async def transfer( self, @@ -1212,6 +1176,9 @@ async def transfer( **backend_kwargs, ) + if error is not None: + raise error + @contextlib.contextmanager def use_channels(self, channels: List[int]): """Temporarily use the specified channels as a default argument to `use_channels`. @@ -1287,25 +1254,13 @@ async def pick_up_tips96( if does_tip_tracking() and not tip_spot.tracker.is_disabled: tip_spot.tracker.rollback() self.head96[i].rollback() - self._trigger_callback( - "pick_up_tips96", - liquid_handler=self, - pickup=pickup_operation, - error=error, - **backend_kwargs, - ) + + raise error else: for i, tip_spot in enumerate(tip_rack.get_all_items()): if does_tip_tracking() and not tip_spot.tracker.is_disabled: tip_spot.tracker.commit() self.head96[i].commit() - self._trigger_callback( - "pick_up_tips96", - liquid_handler=self, - pickup=pickup_operation, - error=None, - **backend_kwargs, - ) async def drop_tips96( self, @@ -1367,13 +1322,8 @@ async def drop_tips96( if does_tip_tracking() and not tip_spot.tracker.is_disabled: tip_spot.tracker.rollback() self.head96[i].rollback() - self._trigger_callback( - "drop_tips96", - liquid_handler=self, - drop=drop_operation, - error=e, - **backend_kwargs, - ) + + raise error else: for i in range(96): if isinstance(resource, TipRack): @@ -1381,13 +1331,6 @@ async def drop_tips96( if does_tip_tracking() and not tip_spot.tracker.is_disabled: tip_spot.tracker.commit() self.head96[i].commit() - self._trigger_callback( - "drop_tips96", - liquid_handler=self, - drop=drop_operation, - error=None, - **backend_kwargs, - ) def _get_96_head_origin_tip_rack(self) -> Optional[TipRack]: """Get the tip rack where the tips on the 96 head were picked up. If no tips were picked up, @@ -1589,27 +1532,14 @@ async def aspirate96( if does_volume_tracking() and not container.tracker.is_disabled: container.tracker.rollback() channel.get_tip().tracker.rollback() - self._trigger_callback( - "aspirate96", - liquid_handler=self, - aspiration=aspiration, - error=error, - **backend_kwargs, - ) + + raise error else: for channel, container in zip(self.head96.values(), containers): if does_volume_tracking() and not container.tracker.is_disabled: container.tracker.commit() channel.get_tip().tracker.commit() - self._trigger_callback( - "aspirate96", - liquid_handler=self, - aspiration=aspiration, - error=None, - **backend_kwargs, - ) - async def dispense96( self, resource: Union[Plate, Container, List[Well]], @@ -1738,27 +1668,13 @@ async def dispense96( container.tracker.rollback() channel.get_tip().tracker.rollback() - self._trigger_callback( - "dispense96", - liquid_handler=self, - dispense=dispense, - error=error, - **backend_kwargs, - ) + raise error else: for channel, container in zip(self.head96.values(), containers): if does_volume_tracking() and not well.tracker.is_disabled: container.tracker.commit() channel.get_tip().tracker.commit() - self._trigger_callback( - "dispense96", - liquid_handler=self, - dispense=dispense, - error=None, - **backend_kwargs, - ) - async def stamp( self, source: Plate, # TODO @@ -2186,36 +2102,6 @@ async def move_plate( **backend_kwargs, ) - def register_callback(self, method_name: str, callback: OperationCallback): - """Registers a callback for a specific method.""" - if method_name in self._callbacks: - error_message = f"Callback already registered for: {method_name}" - raise RuntimeError(error_message) - if method_name not in self.ALLOWED_CALLBACKS: - error_message = f"Callback not allowed: {method_name}" - raise RuntimeError(error_message) - self._callbacks[method_name] = callback - - def _trigger_callback( - self, - method_name: str, - *args, - error: Optional[Exception] = None, - **kwargs, - ): - """Triggers the callback associated with a method, if any. - - NB: If an error exists it will be passed to the callback instead of being raised. - """ - if callback := self._callbacks.get(method_name): - callback(self, *args, error=error, **kwargs) - elif error is not None: - raise error - - @property - def callbacks(self): - return self._callbacks - def serialize(self): return {**Resource.serialize(self), **Machine.serialize(self)} @@ -2275,8 +2161,3 @@ def assign_child_resource( "Cannot assign child resource to liquid handler. Use " "lh.deck.assign_child_resource() instead." ) - - -class OperationCallback(Protocol): - def __call__(self, handler: "LiquidHandler", *args: Any, **kwargs: Any) -> None: - ... # pragma: no cover From 16f438e741d94fbf0d85e64573130e4a0196ac92 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Mon, 19 May 2025 20:49:22 -0700 Subject: [PATCH 2/2] PoC --- pylabrobot/error_handling/__init__.py | 2 ++ pylabrobot/error_handling/handles_errors.py | 26 ++++++++++++++++++++ pylabrobot/error_handling/serial_handler.py | 13 ++++++++++ pylabrobot/liquid_handling/error_handlers.py | 23 +++++++++++++++++ pylabrobot/liquid_handling/liquid_handler.py | 15 +++++++++++ 5 files changed, 79 insertions(+) create mode 100644 pylabrobot/error_handling/__init__.py create mode 100644 pylabrobot/error_handling/handles_errors.py create mode 100644 pylabrobot/error_handling/serial_handler.py create mode 100644 pylabrobot/liquid_handling/error_handlers.py diff --git a/pylabrobot/error_handling/__init__.py b/pylabrobot/error_handling/__init__.py new file mode 100644 index 00000000000..eec0a5b8c91 --- /dev/null +++ b/pylabrobot/error_handling/__init__.py @@ -0,0 +1,2 @@ +from .handles_errors import handles_errors +from .serial_handler import SerialErrorHandler diff --git a/pylabrobot/error_handling/handles_errors.py b/pylabrobot/error_handling/handles_errors.py new file mode 100644 index 00000000000..7400f3aae4b --- /dev/null +++ b/pylabrobot/error_handling/handles_errors.py @@ -0,0 +1,26 @@ +import functools +import inspect + +def handles_errors(func): + @functools.wraps(func) + async def wrapper(self, *args, **kwargs): + try: + return await func(self, *args, **kwargs) + except Exception as error: + handler = self._error_handlers.get(type(error)) + if handler: + print(f"Handling error {error} with: {handler}") + # bind the wrapper to this instance so that + # retries still go through the decorator + bound = wrapper.__get__(self, type(self)) + + # convert all args to kwargs, remove self + sig = inspect.signature(func) + bound_args = sig.bind(self, *args, **kwargs) + bound_args = {k: v for k, v in bound_args.arguments.items() if k != "self"} + + # call the handler, passing it the *decorated* method + return await handler(bound, error, **bound_args) + # no handler registered -> re‑raise + raise + return wrapper diff --git a/pylabrobot/error_handling/serial_handler.py b/pylabrobot/error_handling/serial_handler.py new file mode 100644 index 00000000000..700b0d04863 --- /dev/null +++ b/pylabrobot/error_handling/serial_handler.py @@ -0,0 +1,13 @@ +class SerialErrorHandler: + def __init__(self, child_handlers: list): + self.child_handlers = child_handlers + self.fallback = fallback + self.index = 0 + + def __call__(self, func, *args, **kwargs): + print("serial error handler is choosing next child handler") + if self.index >= len(self.child_handlers): + raise RuntimeError("No more child handlers to call") + handler = self.child_handlers[self.index] + self.index += 1 + return handler(func, *args, **kwargs) diff --git a/pylabrobot/liquid_handling/error_handlers.py b/pylabrobot/liquid_handling/error_handlers.py new file mode 100644 index 00000000000..489350a5f45 --- /dev/null +++ b/pylabrobot/liquid_handling/error_handlers.py @@ -0,0 +1,23 @@ +from pylabrobot.liquid_handling.errors import ChannelizedError + +def try_next_tip_spot(try_tip_spots): + async def handler(func, error: Exception, **kwargs): + assert isinstance(error, ChannelizedError) + + new_tip_spots, new_use_channels = [], [] + + tip_spots = kwargs.pop("tip_spots") + if "use_channels" not in kwargs: + use_channels = list(range(len(tip_spots))) + else: + use_channels = kwargs.pop("use_channels") + + for idx, channel_idx in zip(tip_spots, use_channels): + if channel_idx in error.errors.keys(): + new_tip_spots.append(next(try_tip_spots)) + new_use_channels.append(channel_idx) + + print(f"Retrying with tip spots: {new_tip_spots} and use channels: {new_use_channels}") + return await func(tip_spots=new_tip_spots, use_channels=new_use_channels, **kwargs) + + return handler diff --git a/pylabrobot/liquid_handling/liquid_handler.py b/pylabrobot/liquid_handling/liquid_handler.py index eb7c5a0c8a1..4961a5a44f9 100644 --- a/pylabrobot/liquid_handling/liquid_handler.py +++ b/pylabrobot/liquid_handling/liquid_handler.py @@ -2147,6 +2147,21 @@ async def move_channel_z(self, channel: int, z: float): """Move channel to absolute z position""" assert 0 <= channel < self.backend.num_channels, f"Invalid channel: {channel}" await self.backend.move_channel_z(channel=channel, z=z) + + @contextlib.contextmanager + def on_fail(self, error_cls: Type[Exception], handler: Callable): + """Register a handler to be called when an error occurs. + + Args: + error_cls: The exception class to handle. + handler: The handler function to call. + """ + + self._error_handlers[error_cls] = handler + try: + yield + finally: + del self._error_handlers[error_cls] # -- Resource methods --