Reads a protobuf-encoded MCAP robotics log file and computes a per-timestep composite reward for sidewalk navigation:
r = r_centering + r_heading + r_speed + r_obstacle + r_jerk + r_acc + r_collision
# Activate the virtual environment
source .venv/bin/activate
# Basic run (prints summary to stdout)
python calculate_reward.py path/to/file.mcap
# Write per-timestep results to CSV
python calculate_reward.py path/to/file.mcap --csv rewards_output.csv- Python 3.8+
- numpy — vector math (dot products, norms, array operations)
- mcap-protobuf-support — reads and deserializes protobuf messages from
.mcapfiles
Install into the virtual environment:
python -m venv .venv
source .venv/bin/activate
pip install numpy mcap mcap-protobuf-supportThe script reads six topics from the MCAP file. All other topics are ignored.
| Topic | Protobuf Type | Publish Rate | Purpose |
|---|---|---|---|
/cognition/sta_boundary |
pmx.STABoundary |
~3 Hz | Sidewalk centerline, left boundary, and right boundary as polylines (~12 points each, ~1.5 m spacing, ~16 m lookahead) |
/odom |
pmx.Odometry |
~20 Hz | Robot pose (position + quaternion orientation) and twist (linear + angular velocity). Primary timeline — one reward is computed per odom message |
/speed_limit |
pmx.msgs.speed_governor.SpeedLimit |
Sporadic | Target maximum speed (max_speed field, in m/s) |
/move_serve/planner_state |
pmx.msgs.move_serve.PlannerState |
~10 Hz | Obstacle distances via plan_metrics sub-message (distance to closest stationary and dynamic obstacles) |
/move_serve/imu_jerk_filtered |
pmx.msgs.std_msgs.FloatValueStamped |
~10 Hz | Filtered jerk scalar (rate of change of acceleration, from IMU) |
/move_serve/proximity/state |
pmx.msgs.proximity.State |
~10 Hz | Collision state via in_collision boolean field |
Topics publish at different rates. The script uses /odom as the primary clock and iterates over every odom message. For each odom timestamp, it performs a binary search on each of the other four topic lists to find the message closest in time. This is handled by the find_closest() function, which runs in O(log n) per lookup.
All messages are stored as TimedValue dataclass instances that pair a float timestamp (seconds since Unix epoch) with the decoded protobuf message. Lists are sorted by timestamp after loading, which is a prerequisite for the binary search.
Protobuf timestamps have two integer fields: seconds (Unix epoch) and nanos (0–999999999). The proto_ts() helper combines them into a single float for arithmetic:
float_time = seconds + nanos * 1e-9
At each timestep, three quantities are extracted from the odom message:
- Position (
robot_xy) — 2D ground-plane position frompose.translation.xandpose.translation.y(metres, global frame). The z-component is ignored since all reward geometry is 2D. - Yaw (
robot_yaw) — heading angle in radians (−π to +π) extracted from the pose quaternion (pose.rotation) using the standard quaternion-to-Euler conversion viaquat_to_yaw(). The formula isolates yaw from the quaternion's (x, y, z, w) components usingatan2(2(wz + xy), 1 - 2(y² + z²)). - Speed — ground-plane speed magnitude from
sqrt(twist.linear.x² + twist.linear.y²)(m/s).
There is no direct acceleration topic in the MCAP file. Instead, acceleration is derived from odom speed using finite differences:
acceleration = (speed_current - speed_previous) / (t_current - t_previous)
The first timestep defaults to 0.0 acceleration since there is no previous sample. A guard ensures the time delta is non-zero (> 1e-6 s) to avoid division by zero.
Range: [0.0, 1.0]
Sources: /cognition/sta_boundary + /odom
Rewards the robot for staying near the center of the sidewalk. The calculation:
- Project the robot's position onto the centerline, left boundary, and right boundary polylines using
closest_point_on_polyline()to getdist_to_center,dist_to_left, anddist_to_right. - Estimate the sidewalk half-width as
(dist_to_left + dist_to_right) / 2. - Compute
ratio = dist_to_center / half_width, clamped to [0, 1]. - Return
1.0 - ratio.
The result is 1.0 when perfectly centered and falls linearly to 0.0 at the boundary edge.
Range: [−1.0, +1.0]
Sources: /cognition/sta_boundary + /odom
Rewards the robot for heading in the direction of intended travel. The intended direction is defined as the vector from the robot's position to a point 3 metres ahead along the centerline polyline (obtained from polyline_direction_at()). This 3m lookahead smooths out sharp segment-to-segment direction changes and better represents where the robot should be aiming. The robot's heading is converted from yaw to a 2D unit vector [cos(yaw), sin(yaw)].
The reward is the dot product of the two unit vectors, which equals cos(angle_between_them):
- +1.0 — perfectly aligned with the intended direction
- 0.0 — perpendicular to the intended direction
- −1.0 — facing directly backwards
Range: [0.0, 1.0]
Sources: /odom + /speed_limit
Rewards the robot for travelling at the target max speed. The formula:
r_speed = max(0.0, 1.0 - |current_speed - max_speed| / max_speed)
This gives 1.0 when speed equals max_speed, and falls off linearly in both directions (too slow or too fast). The reward reaches 0.0 when speed is 0 or 2× max_speed, and is clamped so it never goes negative.
If max_speed is ~0, the reward is 1.0 only if the robot is also stopped.
Range: [−5.0, 0.0]
Sources: /move_serve/planner_state
A large negative reward for being too close to obstacles. Uses the planner's pre-computed distances to the nearest stationary obstacle (walls, poles, curbs) and dynamic obstacle (pedestrians, cars, bikes). The minimum of the two is used.
Three zones with tunable thresholds:
| Zone | Distance | Penalty |
|---|---|---|
| Safe | ≥ 1.0 m | 0.0 (no penalty) |
| Transition | 0.2 m – 1.0 m | Linear interpolation from −5.0 to 0.0 |
| Critical | ≤ 0.2 m | −5.0 (full penalty) |
If the planner reports no obstacles (distance fields are 0 or absent), the penalty is 0.0.
Range: (−∞, 0.0]
Sources: /move_serve/imu_jerk_filtered
Penalises change in acceleration (jerk) to encourage smooth motion. The IMU provides a pre-filtered jerk scalar value.
r_jerk = -0.5 * |jerk_value|
The scaling coefficient (0.5) controls penalty strength. The reward is always ≤ 0.
Range: (−∞, 0.0]
Sources: Derived from /odom speed via finite differences
Penalises large accelerations to discourage harsh speed changes.
r_acc = -0.3 * |acceleration|
The scaling coefficient (0.3) controls penalty strength. The reward is always ≤ 0.
Range: [−10.0, 0.0]
Sources: /move_serve/proximity/state
A severe negative reward for being in collision. Uses the in_collision boolean field from the proximity state message. This penalty is particularly important for non-holonomic robots where collisions can cause wheel slip or require complex recovery maneuvers.
r_collision = -10.0 if in_collision else 0.0
The penalty is designed to strongly dominate the reward signal during collision events, ensuring the robot learns to avoid collisions at all costs.
The STA boundary data stores sidewalk edges as polylines — ordered lists of 2D points connected by line segments. Sidewalk edges curve, bend at corners, and have irregular widths, so a single line segment cannot represent them.
Finds the closest point on a polyline to a given query point. For each segment in the polyline:
- Compute the segment direction vector
ab = b - a. - Project the query point onto the infinite line through
aandbusing the formulat = dot(pt - a, ab) / dot(ab, ab). - Clamp
tto [0, 1] so the projection stays on the finite segment (not the infinite extension). - Compute the projected point
a + t * aband its distance to the query point. - Track the minimum distance across all segments.
Returns the closest point, the distance, and the index of the closest segment.
Returns a unit direction vector representing the intended heading (used for the heading reward). The direction is computed as follows:
- Project the robot's position onto the polyline to find the closest point (
proj_pt). - Walk forward along the polyline from
proj_ptfor a configurable lookahead distance (default 3 m), stepping through segments and consuming distance until the target point is reached. If the polyline ends before the full distance, the last point is used. - Compute the direction from the robot's actual position to the 3m-ahead target point, and normalize to a unit vector.
This approach is more stable than using a single segment's direction because it looks ~2 segments ahead (~1.5 m each), smoothing out kinks at segment boundaries. It also naturally accounts for upcoming curves and nudges the robot back toward center if it has drifted off the centerline.
After processing all odom timesteps, the script prints:
- Message counts per topic (to verify data loaded correctly)
- Reward summary table with mean, standard deviation, min, and max for each component and the total reward
- Average speed (m/s) and average |acceleration| (m/s²)
When --csv is provided, writes one row per timestep with columns:
| Column | Description |
|---|---|
time |
Timestamp in seconds (Unix epoch float) |
r_centering |
Centering reward [0, 1] |
r_heading |
Heading alignment reward [−1, 1] |
r_speed |
Speed matching reward [0, 1] |
r_obstacle |
Obstacle penalty [−5, 0] |
r_jerk |
Jerk penalty (−∞, 0] |
r_acc |
Acceleration penalty (−∞, 0] |
r_collision |
Collision penalty [−10, 0] |
r_total |
Sum of all seven components |
speed |
Raw speed in m/s |
acceleration |
Raw acceleration in m/s² |
All reward component constants are defined as global variables at the top of calculate_reward.py for easy modification:
| Constant | Default | Purpose |
|---|---|---|
SAFE_DIST |
1.0 m | No obstacle penalty above this distance |
CRITICAL_DIST |
0.2 m | Full obstacle penalty at or below this distance |
MAX_PENALTY |
−5.0 | Maximum obstacle penalty value |
COLLISION_PENALTY |
−10.0 | Severe penalty for collision events |
JERK_SCALE |
0.5 | Jerk penalty scaling coefficient |
ACC_SCALE |
0.3 | Acceleration penalty scaling coefficient |
LOOKAHEAD_DIST |
3.0 m | How far ahead along the centerline to look for heading direction |
The script validates that the MCAP file contains all required topics before processing:
- Required Topics: All six topics listed in the table above must be present
- Error Handling: If any required topics are missing, the script will:
- Display the missing topics
- List all available topics found in the file
- Exit with error code 1
- File Validation: Also checks that the MCAP file can be read and is not corrupted