Conversation
|
@greptile |
Greptile OverviewGreptile SummaryThis PR introduces a DDS (Data Distribution Service) transport layer using CycloneDDS, creating Major changes:
Critical issues:
The implementation correctly addresses previously reported architectural issues by actually publishing to the DDS network and receiving messages through listeners, rather than just using local callbacks. Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant App as Application
participant DT as DDSTransport
participant DDS as DDS (DDSPubSubBase)
participant DS as DDSService
participant DP as DomainParticipant
participant DW as DataWriter
participant DR as DataReader
participant L as _DDSMessageListener
Note over App,L: Initialization
App->>DT: __init__(topic, type)
DT->>DT: Create Topic(name, typename)
DT->>DDS: Create DDS instance
App->>DT: start()
DT->>DDS: start()
DDS->>DS: get_participant()
DS->>DP: Create DomainParticipant (lazy init)
Note over App,L: Publishing Flow
App->>DT: broadcast(msg)
DT->>DDS: publish(topic, msg)
DDS->>DDS: _get_writer(topic)
DDS->>DW: Create DataWriter (if needed)
DDS->>DW: write(message)
DW->>DP: Send to DDS network
Note over App,L: Subscription Flow
App->>DT: subscribe(callback)
DT->>DDS: subscribe(topic, wrapped_callback)
DDS->>DDS: _get_reader(topic)
DDS->>L: Create _DDSMessageListener
DDS->>DR: Create DataReader with listener
Note over App,L: Message Reception
DP->>DR: Incoming DDS message
DR->>L: on_data_available(reader)
L->>DR: take() samples
L->>L: Iterate over samples
L->>App: callback(sample, topic)
|
|
@greptile |
|
@greptile |
|
@greptile |
|
@greptile |
* raw rospubsub and benchmarks * typefixes, shm added to the benchmark * SHM is not so important to tell us every time when it starts * greptile comments * Add co-authorship line to commit message filter patterns * Remove unused contextmanager import --------- Co-authored-by: Ivan Nikolic <lesh@sysphere.org>
Replace base64 string encoding with native IDL bytearray type to eliminate buffer overflow issues. The original base64 encoding exceeded CycloneDDS's default string size limit (~256 bytes) and caused crashes on messages >= 1KB. Key changes: - Use make_idl_struct with bytearray field instead of string - Convert bytes to bytearray when publishing to DDS - Convert bytearray back to bytes when receiving from DDS - Add _DDSMessageListener for async message dispatch - Implement thread-safe DataWriter/DataReader management - Add pickle support via __getstate__/__setstate__ Result: All 12 DDS benchmark tests pass (64B to 10MB messages).
|
@greptile |
| class Message(IdlStruct): | ||
| """DDS message with binary data payload following IdlStruct format.""" | ||
|
|
||
| payload: str | ||
|
|
||
| def dds_encode(self) -> bytes: | ||
| """Encode message to bytes for DDS transmission.""" | ||
| return self.payload.encode("latin-1") | ||
|
|
||
| @classmethod | ||
| def dds_decode(cls, data: bytes) -> "Message": | ||
| """Decode bytes back to Message instance.""" | ||
| return cls(payload=data.decode("latin-1")) | ||
|
|
There was a problem hiding this comment.
logic: Test Message class uses string encoding which breaks with binary data. The make_data() function generates binary bytes, but encoding as latin-1 string will fail for many byte sequences.
DDS IdlStruct should use bytes type directly:
| class Message(IdlStruct): | |
| """DDS message with binary data payload following IdlStruct format.""" | |
| payload: str | |
| def dds_encode(self) -> bytes: | |
| """Encode message to bytes for DDS transmission.""" | |
| return self.payload.encode("latin-1") | |
| @classmethod | |
| def dds_decode(cls, data: bytes) -> "Message": | |
| """Decode bytes back to Message instance.""" | |
| return cls(payload=data.decode("latin-1")) | |
| @dataclass | |
| class Message(IdlStruct): | |
| """DDS message with binary data payload following IdlStruct format.""" | |
| payload: bytes |
Remove the encode/decode methods as CycloneDDS handles serialization of IdlStruct fields automatically.
…or Message payload.
The double-checked locking pattern avoids lock contention on every call after initial object creation. Initial benchmarking shows this pattern performs better than simple locking for repeated accesses to the same topics.
|
@greptile |
Additional Comments (2)
|
| @@ -0,0 +1,213 @@ | |||
| #!/usr/bin/env python3 | |||
There was a problem hiding this comment.
This file in on dev now. You might want to rebase onto dev so that it it disappears from the diff of this PR.
Related: #1033, #1058