diff --git a/docs/api/sensor_streams/assets/alignment_timeline.png b/docs/api/sensor_streams/assets/alignment_timeline.png
new file mode 100644
index 0000000000..235ddd7be0
--- /dev/null
+++ b/docs/api/sensor_streams/assets/alignment_timeline.png
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:cfea5a6aac40182b25decb9ddaeb387ed97a7708e2c51a48f47453c8df7adf57
+size 16136
diff --git a/docs/api/sensor_streams/assets/alignment_timeline2.png b/docs/api/sensor_streams/assets/alignment_timeline2.png
new file mode 100644
index 0000000000..2bf8ec5eef
--- /dev/null
+++ b/docs/api/sensor_streams/assets/alignment_timeline2.png
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:22b64923637d05f8f40c9f7c0f0597ee894dc4f31a0f10674aeb809101b54765
+size 23471
diff --git a/docs/api/sensor_streams/assets/alignment_timeline3.png b/docs/api/sensor_streams/assets/alignment_timeline3.png
new file mode 100644
index 0000000000..61ddc3b54b
--- /dev/null
+++ b/docs/api/sensor_streams/assets/alignment_timeline3.png
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:b8e9589dcd5308f511a2ec7d41bd36978204ccfe1441907bd139029b0489d605
+size 9969
diff --git a/docs/api/sensor_streams/temporal_alignment.md b/docs/api/sensor_streams/temporal_alignment.md
index aa72c3f59e..9484da7155 100644
--- a/docs/api/sensor_streams/temporal_alignment.md
+++ b/docs/api/sensor_streams/temporal_alignment.md
@@ -4,8 +4,7 @@ Robots have multiple sensors emitting data at different rates and latencies. A c
`align_timestamped` solves this by buffering messages and matching them within a time tolerance.
-
-diagram source
+Pikchr
```pikchr fold output=assets/alignment_overview.svg
color = white
@@ -22,59 +21,226 @@ arrow from Align.e right 0.4in
Out: box "(image, pointcloud)" rad 5px fit wid 170% ht 170%
```
+
+

-
## Basic Usage
-```python session=align
+Below we setup replay of real camera and lidar data from the Unitree Go2 robot, you can check if interested
+
+
+Stream Setup
+
+You can read more about [sensor storage here](storage_replay.md) and [LFS data store here](/docs/data.md)
+
+```python session=align no-result
from reactivex import Subject
+from dimos.utils.testing import TimedSensorReplay
from dimos.types.timestamped import Timestamped, align_timestamped
+from reactivex import operators as ops
+import reactivex as rx
+
+# Load recorded Go2 sensor streams
+video_replay = TimedSensorReplay("unitree_go2_bigoffice/video")
+lidar_replay = TimedSensorReplay("unitree_go2_bigoffice/lidar")
+
+# this is a bit tricky, we find the first video frame timestamp, then add 2 seconds to it
+seek_ts = video_replay.first_timestamp() + 2
+
+# Lists to collect items as they flow through streams
+video_frames = []
+lidar_scans = []
+
+# We are using from_timestamp=... and not seek=... because seek seeks through recording
+# timestamps, from_timestamp matches actual message timestamp.
+# It's possible for sensor data to come in late, but with correct capture time timestamps
+video_stream = video_replay.stream(from_timestamp=seek_ts, duration=2.0).pipe(
+ ops.do_action(lambda x: video_frames.append(x))
+)
-# Create streams
-camera = Subject()
-lidar = Subject()
+lidar_stream = lidar_replay.stream(from_timestamp=seek_ts, duration=2.0).pipe(
+ ops.do_action(lambda x: lidar_scans.append(x))
+)
+
+```
+
+
+
+
+Streams would normally come from an actual robot into your module via `IN` inputs, [`detection/module3D.py`](/dimos/perception/detection/module3D.py#L11) is a good example of this.
+
+Assume we have them, let's align them.
-# Align camera frames with lidar scans
+```python session=align
+# Align video (primary) with lidar (secondary)
# match_tolerance: max time difference for a match (seconds)
# buffer_size: how long to keep messages waiting for matches (seconds)
-aligned = align_timestamped(
- camera,
- lidar,
- match_tolerance=0.1,
- buffer_size=2.0,
-)
+aligned_pairs = align_timestamped(
+ video_stream,
+ lidar_stream,
+ match_tolerance=0.025, # 25ms tolerance
+ buffer_size=5.0, # how long to wait for match
+).pipe(ops.to_list()).run()
+
+print(f"Video: {len(video_frames)} frames, Lidar: {len(lidar_scans)} scans")
+print(f"Aligned pairs: {len(aligned_pairs)} out of {len(video_frames)} video frames")
+
+# Show a matched pair
+if aligned_pairs:
+ img, pc = aligned_pairs[0]
+ dt = abs(img.ts - pc.ts)
+ print(f"\nFirst matched pair: Δ{dt*1000:.1f}ms")
+```
-results = []
-aligned.subscribe(lambda pair: results.append(pair))
+
+```
+Video: 29 frames, Lidar: 15 scans
+Aligned pairs: 11 out of 29 video frames
-# Helper to create timestamped messages
-class Msg(Timestamped):
- def __init__(self, ts: float, data: str):
- super().__init__(ts)
- self.data = data
+First matched pair: Δ11.3ms
+```
-# Emit some data
-camera.on_next(Msg(1.0, "frame_1"))
-camera.on_next(Msg(2.0, "frame_2"))
+
+Visualization helper
+
+```python session=align fold no-result
+import matplotlib
+import matplotlib.pyplot as plt
+
+def plot_alignment_timeline(video_frames, lidar_scans, aligned_pairs, path):
+ """Single timeline: video above axis, lidar below, green lines for matches."""
+ matplotlib.use('Agg')
+ plt.style.use('dark_background')
+
+ # Get base timestamp for relative times (frames have .ts attribute)
+ base_ts = video_frames[0].ts
+ video_ts = [f.ts - base_ts for f in video_frames]
+ lidar_ts = [s.ts - base_ts for s in lidar_scans]
+
+ # Find matched timestamps
+ matched_video_ts = set(img.ts for img, _ in aligned_pairs)
+ matched_lidar_ts = set(pc.ts for _, pc in aligned_pairs)
+
+ fig, ax = plt.subplots(figsize=(12, 2.5))
+
+ # Video markers above axis (y=0.3) - circles, cyan when matched
+ for frame in video_frames:
+ rel_ts = frame.ts - base_ts
+ matched = frame.ts in matched_video_ts
+ ax.plot(rel_ts, 0.3, 'o', color='cyan' if matched else '#688', markersize=8)
+
+ # Lidar markers below axis (y=-0.3) - squares, orange when matched
+ for scan in lidar_scans:
+ rel_ts = scan.ts - base_ts
+ matched = scan.ts in matched_lidar_ts
+ ax.plot(rel_ts, -0.3, 's', color='orange' if matched else '#a86', markersize=8)
+
+ # Green lines connecting matched pairs
+ for img, pc in aligned_pairs:
+ img_rel = img.ts - base_ts
+ pc_rel = pc.ts - base_ts
+ ax.plot([img_rel, pc_rel], [0.3, -0.3], '-', color='lime', alpha=0.6, linewidth=1)
+
+ # Axis styling
+ ax.axhline(y=0, color='white', linewidth=0.5, alpha=0.3)
+ ax.set_xlim(-0.1, max(video_ts + lidar_ts) + 0.1)
+ ax.set_ylim(-0.6, 0.6)
+ ax.set_xlabel('Time (s)')
+ ax.set_yticks([0.3, -0.3])
+ ax.set_yticklabels(['Video', 'Lidar'])
+ ax.set_title(f'{len(aligned_pairs)} matched from {len(video_frames)} video + {len(lidar_scans)} lidar')
+ plt.tight_layout()
+ plt.savefig(path, transparent=True)
+ plt.close()
+```
-# Lidar arrives - matches frame_1 (within 0.05s tolerance)
-lidar.on_next(Msg(1.05, "scan_1"))
-print(f"matched: {results[-1][0].data} <-> {results[-1][1].data}")
+
+
+```python session=align output=assets/alignment_timeline.png
+plot_alignment_timeline(video_frames, lidar_scans, aligned_pairs, '{output}')
+```
+
+
+
+
+if we loosen up our match tolerance we might get multiple pairs matching the same lidar frame
-# Lidar arrives - matches frame_2
-lidar.on_next(Msg(1.98, "scan_2"))
-print(f"matched: {results[-1][0].data} <-> {results[-1][1].data}")
+```python session=align
+aligned_pairs = align_timestamped(
+ video_stream,
+ lidar_stream,
+ match_tolerance=0.05, # 50ms tolerance
+ buffer_size=5.0, # how long to wait for match
+).pipe(ops.to_list()).run()
+
+print(f"Video: {len(video_frames)} frames, Lidar: {len(lidar_scans)} scans")
+print(f"Aligned pairs: {len(aligned_pairs)} out of {len(video_frames)} video frames")
```
```
-matched: frame_1 <-> scan_1
-matched: frame_2 <-> scan_2
+Video: 58 frames, Lidar: 30 scans
+Aligned pairs: 23 out of 58 video frames
+```
+
+
+```python session=align output=assets/alignment_timeline2.png
+plot_alignment_timeline(video_frames, lidar_scans, aligned_pairs, '{output}')
+```
+
+
+
+
+## We can combine frame alignment with a quality filter
+
+more on [quality filtering here](quality_filter.md)
+
+```python session=align
+from dimos.msgs.sensor_msgs.Image import Image, sharpness_barrier
+
+# Lists to collect items as they flow through streams
+video_frames = []
+lidar_scans = []
+
+video_stream = video_replay.stream(from_timestamp=seek_ts, duration=2.0).pipe(
+ sharpness_barrier(3.0),
+ ops.do_action(lambda x: video_frames.append(x))
+)
+
+lidar_stream = lidar_replay.stream(from_timestamp=seek_ts, duration=2.0).pipe(
+ ops.do_action(lambda x: lidar_scans.append(x))
+)
+
+aligned_pairs = align_timestamped(
+ video_stream,
+ lidar_stream,
+ match_tolerance=0.025, # 25ms tolerance
+ buffer_size=5.0, # how long to wait for match
+).pipe(ops.to_list()).run()
+
+print(f"Video: {len(video_frames)} frames, Lidar: {len(lidar_scans)} scans")
+print(f"Aligned pairs: {len(aligned_pairs)} out of {len(video_frames)} video frames")
+
+```
+
+
+```
+Video: 6 frames, Lidar: 15 scans
+Aligned pairs: 1 out of 6 video frames
+```
+
+```python session=align output=assets/alignment_timeline3.png
+plot_alignment_timeline(video_frames, lidar_scans, aligned_pairs, '{output}')
```
+
+
+
+We are very picky but data is high quality. best frame, with closest lidar match in this window.
+
## How It Works
The primary stream (first argument) drives emissions. When a primary message arrives:
@@ -107,11 +273,11 @@ Buffer: box "Buffer" "primary" rad 5px fit wid 170% ht 170%
text "waiting..." at (Buffer.w.x - 0.4in, Buffer.w.y - 0.15in)
```
+
+

-
-
## Parameters
| Parameter | Type | Default | Description |
@@ -121,90 +287,7 @@ text "waiting..." at (Buffer.w.x - 0.4in, Buffer.w.y - 0.15in)
| `match_tolerance` | `float` | 0.1 | Max time difference for a match (seconds) |
| `buffer_size` | `float` | 1.0 | How long to buffer unmatched messages (seconds) |
-## Multiple Secondary Streams
-
-Align a primary with multiple secondaries - the result tuple contains all matched messages:
-
-```python session=align
-# New streams
-camera2 = Subject()
-lidar2 = Subject()
-imu = Subject()
-
-aligned_multi = align_timestamped(
- camera2,
- lidar2,
- imu,
- match_tolerance=0.05,
- buffer_size=1.0,
-)
-
-multi_results = []
-aligned_multi.subscribe(lambda x: multi_results.append(x))
-
-# All three must arrive within tolerance
-camera2.on_next(Msg(1.0, "frame"))
-lidar2.on_next(Msg(1.02, "scan"))
-# Still waiting for IMU...
-print(f"results so far: {len(multi_results)}")
-
-imu.on_next(Msg(1.03, "imu_reading"))
-print(f"after IMU: {len(multi_results)}")
-print(f"matched: ({multi_results[0][0].data}, {multi_results[0][1].data}, {multi_results[0][2].data})")
-```
-
-
-```
-results so far: 0
-after IMU: 1
-matched: (frame, scan, imu_reading)
-```
-
-## With Backpressure
-
-In practice, you often combine alignment with [`backpressure`](/docs/api/sensor_streams/advanced_streams.md) for slow processors:
-
-```python session=align
-from dimos.utils.reactive import backpressure
-from reactivex.scheduler import ThreadPoolScheduler
-from reactivex import operators as ops
-import time
-scheduler = ThreadPoolScheduler(max_workers=2)
-
-# Simulated streams
-fast_camera = Subject()
-fast_lidar = Subject()
-
-# Slow processing that needs the latest aligned pair
-def slow_process(pair):
- frame, scan = pair
- time.sleep(0.1) # Simulate slow ML inference
- return f"processed_{frame.data}"
-
-# backpressure ensures slow_process gets latest pair, not queued old ones
-processed = backpressure(
- align_timestamped(fast_camera, fast_lidar, match_tolerance=0.1),
- scheduler=scheduler
-).pipe(ops.map(slow_process))
-
-slow_results = []
-processed.subscribe(lambda x: slow_results.append(x))
-
-# Rapid emissions
-for i in range(5):
- fast_camera.on_next(Msg(float(i), f"f{i}"))
- fast_lidar.on_next(Msg(float(i) + 0.01, f"s{i}"))
-
-time.sleep(0.5)
-print(f"processed {len(slow_results)} pairs (skipped {5 - len(slow_results)})")
-scheduler.executor.shutdown(wait=True)
-```
-
-
-```
-processed 2 pairs (skipped 3)
-```
## Usage in Modules
@@ -228,57 +311,3 @@ class Detection3DModule(Detection2DModule):
```
The 2D detection stream (camera + ML model) is the primary, matched with raw pointcloud data from lidar. The longer `buffer_size=20.0` accounts for variable ML inference times.
-
-## Edge Cases
-
-### Unmatched Messages
-
-Messages that can't be matched within tolerance are dropped:
-
-```python session=align
-camera3 = Subject()
-lidar3 = Subject()
-
-dropped = align_timestamped(camera3, lidar3, match_tolerance=0.05, buffer_size=1.0)
-
-drop_results = []
-dropped.subscribe(lambda x: drop_results.append(x))
-
-# These won't match - timestamps too far apart
-camera3.on_next(Msg(1.0, "frame"))
-lidar3.on_next(Msg(1.2, "scan")) # 0.2s diff > 0.05s tolerance
-
-print(f"matches: {len(drop_results)}")
-```
-
-
-```
-matches: 0
-```
-
-### Buffer Expiry
-
-Old buffered primaries are cleaned up when secondaries progress past them:
-
-```python session=align
-camera4 = Subject()
-lidar4 = Subject()
-
-expired = align_timestamped(camera4, lidar4, match_tolerance=0.05, buffer_size=0.5)
-
-exp_results = []
-expired.subscribe(lambda x: exp_results.append(x))
-
-# Primary at t=1.0 waiting for secondary
-camera4.on_next(Msg(1.0, "old_frame"))
-
-# Secondary arrives much later - primary is no longer matchable
-lidar4.on_next(Msg(2.0, "late_scan"))
-
-print(f"matches: {len(exp_results)}") # old_frame expired
-```
-
-
-```
-matches: 0
-```