From 68d41e7ff4b6e7338d9216f12ece9e122dfd90b7 Mon Sep 17 00:00:00 2001 From: ym-han <21286812+ym-han@users.noreply.github.com> Date: Mon, 24 Nov 2025 14:26:25 +0000 Subject: [PATCH] transport: Remove DaskTransport dead code --- dimos/core/transport.py | 47 ----------------------------------------- 1 file changed, 47 deletions(-) diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 5cd05460f0..28773487be 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -182,51 +182,4 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> return self.shm.subscribe(self.topic, lambda msg, topic: callback(msg)) -class DaskTransport(Transport[T]): - subscribers: list[Callable[[T], None]] - _started: bool = False - - def __init__(self) -> None: - self.subscribers = [] - - def __str__(self) -> str: - return colors.yellow("DaskTransport") - - def __reduce__(self): - return (DaskTransport, ()) - - def broadcast(self, selfstream: RemoteIn[T], msg: T) -> None: - for subscriber in self.subscribers: - # there is some sort of a bug here with losing worker loop - # print(subscriber.owner, subscriber.owner._worker, subscriber.owner._client) - # subscriber.owner._try_bind_worker_client() - # print(subscriber.owner, subscriber.owner._worker, subscriber.owner._client) - - subscriber.owner.dask_receive_msg(subscriber.name, msg).result() - - def dask_receive_msg(self, msg) -> None: - for subscriber in self.subscribers: - try: - subscriber(msg) - except Exception as e: - print( - colors.red("Error in DaskTransport subscriber callback:"), - e, - traceback.format_exc(), - ) - - # for outputs - def dask_register_subscriber(self, remoteInput: RemoteIn[T]) -> None: - self.subscribers.append(remoteInput) - - # for inputs - def subscribe(self, callback: Callable[[T], None], selfstream: In[T]) -> None: - if not self._started: - selfstream.connection.owner.dask_register_subscriber( - selfstream.connection.name, selfstream - ).result() - self._started = True - self.subscribers.append(callback) - - class ZenohTransport(PubSubTransport[T]): ...