diff --git a/dimos/protocol/pubsub/shmpubsub.py b/dimos/protocol/pubsub/shmpubsub.py index 6539cfefdb..10f8763496 100644 --- a/dimos/protocol/pubsub/shmpubsub.py +++ b/dimos/protocol/pubsub/shmpubsub.py @@ -159,7 +159,8 @@ def publish(self, topic: str, message: bytes) -> None: raise ValueError(f"Payload too large: {L} > capacity {st.capacity}") # Mark this payload to suppress its single echo (handles back-to-back publishes) - st.suppress_counts[payload_bytes] += 1 + payload_hash = hashlib.md5(payload_bytes).digest() + st.suppress_counts[payload_hash] += 1 # Synchronous local delivery first (zero extra copies) for cb in list(st.subs): @@ -276,12 +277,13 @@ def _fanout_loop(self, topic: str, st: _TopicState) -> None: payload = host[4 : 4 + L].tobytes() # Drop exactly the number of local echoes we created - cnt = st.suppress_counts.get(payload, 0) + payload_hash = hashlib.md5(payload).digest() + cnt = st.suppress_counts.get(payload_hash, 0) if cnt > 0: if cnt == 1: - del st.suppress_counts[payload] + del st.suppress_counts[payload_hash] else: - st.suppress_counts[payload] = cnt - 1 + st.suppress_counts[payload_hash] = cnt - 1 continue # suppressed except Exception: