Realtime XArm driver for joint positiona and velocitt control#702
Realtime XArm driver for joint positiona and velocitt control#702
Conversation
| # NOTE: velocities are in degrees/second, not radians! | ||
| # Always send velocity commands (velocity changes frequently) | ||
| code = self.arm.vc_set_joint_velocity( | ||
| joint_cmd, False, self.config.velocity_duration |
There was a problem hiding this comment.
Bug here: joint_cmd is in radian, but this call expects degrees.
There was a problem hiding this comment.
@luqiang21 the xarm SDK uses radians when sending joint position commands,
But when sending velocity commands it expects inputs as deg/s
hence the difference here.
| self._stop_event = threading.Event() | ||
|
|
||
| # Subscription management | ||
| self._disposables = CompositeDisposable() |
There was a problem hiding this comment.
You should call this line in start() not here. Creating here will not allow the driver to receive command after a stop/start cycle.
Reason: stop() disposed CompositeDisposable(), when you call another start() , the new subscription will be added to an already-disposed CompositeDisposable().
| timeout_logged = False | ||
|
|
||
| # Send command if available | ||
| if joint_cmd is not None and len(joint_cmd) == self.config.num_joints: |
There was a problem hiding this comment.
You should check readiness of the robot before sending command, otherwise commands may fail silently or cause error accumulation.
| except Exception as e: | ||
| logger.error(f"[{self.config.ip_address}] Error recovery failed: {e}") | ||
|
|
||
| def _xarm_is_ready_write(self) -> bool: |
There was a problem hiding this comment.
Variables _last_state, _last_mode, _last_not_ready are used but may not be initialized before first use
They are only set when conditions are met, but if _xarm_is_ready_write() is called before any state changes, these variables don't exist
| if self.config.velocity_control: | ||
| zero_vel = [0.0] * self.config.num_joints | ||
| self.arm.vc_set_joint_velocity( | ||
| zero_vel, True, self.config.velocity_duration |
There was a problem hiding this comment.
When command timeout occurs, zero velocity is sent with is_radian=True (line 944)
But the control loop uses is_radian=False for velocity commands (line 959)
You should make them consistent by changing this like to False as well
| # Enable servo mode if configured | ||
| if self.config.enable_on_start: | ||
| logger.info("Enabling servo mode (mode=1)...") | ||
| code = self.arm.set_mode(1) # Servo mode |
There was a problem hiding this comment.
Velocity Mode Never Enabled
Problem:
- _initialize_arm() always sets servo mode (set_mode(1)) and expects mode 1, even when self.config.velocity_control is True.
- The control loop later sends vc_set_joint_velocity commands assuming mode 4, so the controller rejects every command (code 9).
Impact:
- Critical: velocity-control deployments can never leave servo mode, so no velocities are accepted.
Fix:
- Make _initialize_arm() check velocity_control like _attempt_error_recovery() does
There was a problem hiding this comment.
@luqiang21 in my testing i found that the velocity control and position control both work using the SERVO Mode (1) and they are not exclusive to their specific modes.
There was a problem hiding this comment.
@luqiang21 in my testing i found that the velocity control and position control both work using the SERVO Mode (1) and they are not exclusive to their specific modes.
Is it desired behavior?
|
@mustafab0 |
| return mock | ||
|
|
||
|
|
||
| def test_basic_connection(): |
There was a problem hiding this comment.
Shouldn't these be pytest tests? If they have to be executed manually, they are bound to break.
There was a problem hiding this comment.
I tried building this with pytest last time and couldnt figure it out, so went with this basic thing. I have added a new pytest now.
| # Check connection via RPC | ||
| logger.info("Checking connection via RPC...") | ||
| code, version = driver.get_version() | ||
| if code == 0: |
There was a problem hiding this comment.
This should be:
assert code == 0, "Failed to get firmware version"| logger.info(f"✓ Firmware version: {version}") | ||
| else: | ||
| logger.error(f"✗ Failed to get firmware version: code={code}") | ||
| dimos.stop() |
There was a problem hiding this comment.
You can use a fixture like this and not have to manually clean up:
@pytest.fixture
def dimos_cluster():
dimos = core.start(1)
try:
yield dimos
finally:
dimos.stop()| self._stop_event = threading.Event() | ||
|
|
||
| # Subscription management | ||
| self._disposables = CompositeDisposable() |
There was a problem hiding this comment.
self._disposables already exists on Module.
|
|
||
| logger.info(f"Robot state loop started at {self.config.robot_state_rate}Hz") | ||
|
|
||
| while self._running: |
There was a problem hiding this comment.
Do you need self._running if you have self._stop_event? Events are thread safe. This could just be
while not self._stop_event.is_set()| python test_xarm_driver.py --ip 192.168.1.235 | ||
| python interactive_control.py --ip 192.168.1.235 |
There was a problem hiding this comment.
It would be nice to have a blueprint for these commands. Picking up arguments from environment variables or CLI is supported.
| To test with real hardware using the current Alfred embodiment: | ||
|
|
||
| 1. **Turn on the Flowbase** (xArm controller) | ||
| 2. **SSH into dimensional-cpu-2:** |
There was a problem hiding this comment.
This is a bit overly specific to us. We should probably have internal docs somewhere else.
| Component providing state query RPC methods for XArmDriver. | ||
|
|
||
| This component assumes the parent class has: | ||
| - self.arm: XArmAPI instance | ||
| - self.config: XArmDriverConfig instance | ||
| - self._joint_state_lock: threading.Lock | ||
| - self._joint_states_: Optional[JointState] | ||
| - self._robot_state_: Optional[RobotState] |
There was a problem hiding this comment.
I'm a bit unsure about the architecture here. It seems like XArmDriver is a single class which had it's methods moved into five mixins: MotionControlComponent, StateQueryComponent, SystemControlComponent, KinematicsComponent, GripperControlComponent.
I don't think they are real components as they are not used independently in other places and the dependencies are described in comments like here.
Having a single class would make it a bit harder to read, but it would make it more robust as you can depend on mypy for typechecking instead of comments.
There was a problem hiding this comment.
Thanks for taking a look!
The reason for making the five mixins was essentially to keep the main driver file simpler. The thought process behind this is as follows.
For most manipulation tasks to move the robot we are going to give it joint position commands or joint velocity commands from the main control loop thread. The mixins/components are essentially all the features that the xarm SDK exposes for the arm. Most of these will not be used in "planned tasks" but they are necessary, for example clear error etc.
I debated putting all the mixins in the main driver file but it got too long.
But further more another motivation for breaking them into components is to strategically categories them such that it becomes a standard template for creating drivers for other arms as well. As piper arm SDK does not have all the features that xarm SDK has and vice versa, it is easy to make additional mixins or omit whatever isn't available, while the main driver's position and velocity control remains largely unchanged.
There was a problem hiding this comment.
I completely agree with breaking large classes into reusable and interchangeable components, the problem is that mixins are not the best way to achieve that because they are not good at encapsulation and providing interfaces.
For example, instead of assuming that self.arm exists in the MotionControlComponent mixin, you can change it to an actual class that accepts arm
class XArmDriver:
def __init__(...):
self.arm = ...
self.motion_control = MotionControlComponent(self.arm)Currently, MotionControlComponent reaches into the internals of XArmDriver, for example by setting self._joint_cmd_. That's why I'm saying XArmDriver is really a single class split into multiple files. For components to be reusable, they need to have an established interface. This can be verified statically so we don't end up altering one implementation without the others.
But if you want to keep mixins, you can change this comment to a Protocol.
Something like:
class StateQueryHost(Protocol):
arm: XArmAPI
config: XArmDriverConfig
_joint_state_lock: threading.Lock
_joint_states_: JointState | None
_robot_state_: RobotState | Noneand then annotate the self arguments.
class StateQueryComponent:
def get_joint_state(self: StateQueryHost) -> JointState | None:There was a problem hiding this comment.
I'm not saying it should be done now, though. This PR is quite large and I don't want to block you. It would be nice to do it in a subsequent PR though. :)
| assert len(position) == 6, "Position should have 6 values [x,y,z,roll,pitch,yaw]" | ||
| assert position[0] == 200.0, "X position should match mock" | ||
|
|
||
| driver.stop() |
There was a problem hiding this comment.
Stop is already called by the driver fixture. I don't think you have to call it in every test.
| pass # Expected - no transport in unit test | ||
|
|
||
| driver.start() | ||
| time.sleep(1.0) |
There was a problem hiding this comment.
Is there a way to avoid the sleep by waiting for the specific event to happen?
The worst thing about sleep is that it takes the same amount of time no matter how many cores you have. :D
…lishing after trajectory completion
… deprecated type annotations to use Python 3.10+ syntax
…reads running and stop event
f32afab to
e7392b8
Compare
The xArm SDK returns 7 joints even when init_axis=6 is specified, including the gripper as joint 7. This caused the trajectory controller to send 7-joint commands which were rejected by the driver. Fix: Truncate SDK joint data to num_joints in both new and old firmware paths, ensuring only the configured number of arm joints are published. This fixes the issue where trajectories wouldn't execute because commands had the wrong number of joints. Works for xarm5, xarm6, and xarm7. Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
|
Why all your commits' time changed? It will be hard for reviewers to know what you changed to resolve their comments. |
| @@ -0,0 +1,385 @@ | |||
| # Copyright 2025 Dimensional Inc. | |||
There was a problem hiding this comment.
This dir is deprecated so look at how unit tests implemented w pytset throughout the codebase.
I did a rebase and things got messy. Sorry about that. |
|
Updated driiver architecture is found here: #831 |
This PR does the following:
Subscribed:
/xarm/joint_position_command- JointCommand (positions in radians)/xarm/joint_velocity_command- JointCommand (velocities in deg/s)Published:
/xarm/joint_states- JointState (100Hz)/xarm/robot_state- RobotState (10Hz)/xarm/ft_ext,/xarm/ft_raw- WrenchStamped (force/torque)With Mock Hardware (No Physical Robot)