Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 0 additions & 47 deletions dimos/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,51 +182,4 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) ->
return self.shm.subscribe(self.topic, lambda msg, topic: callback(msg))


class DaskTransport(Transport[T]):
subscribers: list[Callable[[T], None]]
_started: bool = False

def __init__(self) -> None:
self.subscribers = []

def __str__(self) -> str:
return colors.yellow("DaskTransport")

def __reduce__(self):
return (DaskTransport, ())

def broadcast(self, selfstream: RemoteIn[T], msg: T) -> None:
for subscriber in self.subscribers:
# there is some sort of a bug here with losing worker loop
# print(subscriber.owner, subscriber.owner._worker, subscriber.owner._client)
# subscriber.owner._try_bind_worker_client()
# print(subscriber.owner, subscriber.owner._worker, subscriber.owner._client)

subscriber.owner.dask_receive_msg(subscriber.name, msg).result()

def dask_receive_msg(self, msg) -> None:
for subscriber in self.subscribers:
try:
subscriber(msg)
except Exception as e:
print(
colors.red("Error in DaskTransport subscriber callback:"),
e,
traceback.format_exc(),
)

# for outputs
def dask_register_subscriber(self, remoteInput: RemoteIn[T]) -> None:
self.subscribers.append(remoteInput)

# for inputs
def subscribe(self, callback: Callable[[T], None], selfstream: In[T]) -> None:
if not self._started:
selfstream.connection.owner.dask_register_subscriber(
selfstream.connection.name, selfstream
).result()
self._started = True
self.subscribers.append(callback)


class ZenohTransport(PubSubTransport[T]): ...
Loading