inferaxis is a unified-data-interface, dynamically latency-adaptive inference
system for embodied control. It standardizes observations into Frame,
actions into Action, and keeps the outer execution loop stable through
run_step(...) and InferenceRuntime(...).
The point of the project is simple: once your data matches the shared runtime interface, the same loop can drive:
- normal sync inference
- async chunked inference
- dynamically latency-adaptive chunk scheduling
- local data collection
- replay of recorded actions
- sync-latency profiling and runtime recommendation
inferaxis is not a robot middleware, transport stack, or deployment system.
It focuses on the inference-side data contract and control loop.
git clone https://github.com/zywang03/inferaxis.git
cd inferaxis
pip install .inferaxis is numpy-based inside the core runtime. Images, state, and action
payloads are normalized to numpy.ndarray.
The public surface is intentionally small:
FrameActionCommandrun_step(...)InferenceRuntime(...)RealtimeController
The runtime call boundary is:
observe_fn() -> Frameact_fn(action) -> Action | Noneact_src_fn(frame, request) -> Action | list[Action]
Returning one Action means chunk size 1. Returning list[Action] lets the
same source participate in async chunk scheduling.
import inferaxis as infra
import numpy as np
class YourExecutor:
def get_obs(self):
return infra.Frame(
images={"front_rgb": np.zeros((2, 2, 3), dtype=np.uint8)},
state={
"left_arm": np.zeros(6, dtype=np.float64),
"left_gripper": np.array([0.5], dtype=np.float64),
"right_arm": np.zeros(6, dtype=np.float64),
"right_gripper": np.array([0.5], dtype=np.float64),
},
)
def send_action(self, action):
return action
class YourPolicy:
def infer(self, frame, request):
del frame, request
return infra.Action(
commands={
"left_arm": infra.Command(
command=infra.BuiltinCommandKind.CARTESIAN_POSE_DELTA,
value=np.zeros(6, dtype=np.float64),
),
"left_gripper": infra.Command(
command=infra.BuiltinCommandKind.GRIPPER_POSITION,
value=np.array([0.5], dtype=np.float64),
),
"right_arm": infra.Command(
command=infra.BuiltinCommandKind.CARTESIAN_POSE_DELTA,
value=np.zeros(6, dtype=np.float64),
),
"right_gripper": infra.Command(
command=infra.BuiltinCommandKind.GRIPPER_POSITION,
value=np.array([0.5], dtype=np.float64),
),
}
)
executor = YourExecutor()
policy = YourPolicy()
result = infra.run_step(
observe_fn=executor.get_obs,
act_fn=executor.send_action,
act_src_fn=policy.infer,
)If you only want normalized frame -> action inference:
result = infra.run_step(
frame=my_frame,
act_src_fn=policy.infer,
execute_action=False,
)Frame is the normalized observation container:
frame = infra.Frame(
images={"front_rgb": np.ndarray(...)},
state={
"left_arm": np.ndarray(...),
"left_gripper": np.ndarray(...),
"right_arm": np.ndarray(...),
"right_gripper": np.ndarray(...),
},
)Action is the normalized control container:
action = infra.Action(
commands={
"left_arm": infra.Command(
command=infra.BuiltinCommandKind.CARTESIAN_POSE_DELTA,
value=np.ndarray(...),
),
"left_gripper": infra.Command(
command=infra.BuiltinCommandKind.GRIPPER_POSITION,
value=np.ndarray(...),
),
},
)Key runtime rules:
observe_fn()must returninferaxis.Frame.act_src_fn(frame, request)must returninferaxis.Actionorlist[inferaxis.Action].act_fn(action)receivesinferaxis.Action.timestamp_nsandsequence_idare generated by inferaxis internally.
command is not a free string. It must match the declared command kind for that
component. Built-ins include:
joint_positionjoint_position_deltajoint_velocitycartesian_posecartesian_pose_deltacartesian_twistgripper_positiongripper_position_deltagripper_velocitygripper_open_closehand_joint_positionhand_joint_position_deltaeef_activation
Project-specific command kinds can be registered as custom:....
run_step(...) is the single outer loop entrypoint. InferenceRuntime(...)
adds optimization and scheduling without changing that outer call style.
runtime = infra.InferenceRuntime(
mode=infra.InferenceMode.ASYNC,
steps_before_request=0,
warmup_requests=1,
profile_delay_requests=3,
realtime_controller=infra.RealtimeController(hz=50.0),
)
result = infra.run_step(
observe_fn=executor.get_obs,
act_fn=executor.send_action,
act_src_fn=policy.infer,
runtime=runtime,
)This lets the same data interface support:
- sync and async chunk execution
- async chunk scheduling with front-triggered
steps_before_request - chunk handoff blending via
ensemble_weight=... - paced closed-loop execution
- latency profiling against a required target control hz via
profile_sync_inference(...) - mode recommendation via
recommend_inference_mode(...)
When mode=ASYNC, no manual latency seed is needed anymore. If you attach a
RealtimeController(...), inferaxis first issues request-only warmup calls for
warmup_requests, then profiles delay across profile_delay_requests
requests, converts that to control-step latency, and only then starts sending
actions to the robot. This bootstrap happens automatically on the first
run_step(...) call once observe_fn and act_src_fn are available.
Because of that startup warmup, policy.infer(...) should derive chunks from
frame and request instead of relying on mutable call-count state.
If you want startup warmup/profile to happen outside the first run_step(...)
call, use runtime.bootstrap_async(...) once before entering the loop.
When enable_rtc=True, policy.infer(...) receives the RTC hints directly on
request.prev_action_chunk, request.inference_delay, and
request.execute_horizon. The same values are also mirrored on
request.rtc_args for grouped access:
prev_action_chunk: a fixed-length raw chunk built from the current live buffer head, left-padded with the first live action until it reaches the locked source chunk lengthinference_delay: the estimated number of raw chunk steps from request launch until the new chunk can begin taking effect, clamped into the current RTC execution horizonexecute_horizon: the fixed raw-step RTC execution window (execution_steps), so the effective RTC interval is[inference_delay, execute_horizon)
If you need to manually nudge the async latency hint carried on requests, set
latency_steps_offset=.... This applies a signed raw-step offset to the
request-facing latency hint (request.latency_steps and, with enable_rtc=True,
request.inference_delay). This does not change steps_before_request,
stale-prefix drop, or execution smoothing.
During cold start, the very first RTC bootstrap request is sent without RTC
args so inferaxis can lock the source chunk length and seed the first RTC
context. Later warmup/profile requests already send prev_action_chunk,
letting the server warm up that path before the first executable chunk is
accepted. If that last RTC warmup request takes more than 500ms, inferaxis
warns and asks whether startup should continue. This still avoids needing
robot.get_spec() or any extra bootstrap length config.
For chunked async execution, inferaxis now starts the next request from the
front of the currently active raw chunk. Once a new chunk is accepted, the
runtime waits until steps_before_request raw actions from that chunk have been
executed, then launches the next request. steps_before_request=0 means the next
request starts immediately when the chunk is integrated.
H_hat still starts from the startup delay profiled over
profile_delay_requests requests and is then updated online as an EMA of
observed request latency measured directly in control steps. When a reply
arrives, inferaxis drops the stale prefix and either switches to the aligned
new chunk directly or blends the aligned handoff prefix when
ensemble_weight=... is set. ensemble_weight may be one scalar shared by
every aligned handoff step or a (low, high) pair that ramps from the
earliest step to the latest. Built-in gripper commands switch to the new chunk
directly instead of being averaged. It does not apply an extra per-step
temporal filter to every emitted action. In practice, this makes inferaxis a
dynamically latency-adaptive inference system: request timing is updated online
from measured chunk latency instead of being fixed ahead of time.
ensemble_weight defaults to None. If it is omitted, inferaxis does not
blend aligned handoff actions and simply switches to the aligned new chunk.
check_policy(...) and check_pair(...) are dry-run validation helpers.
- They validate the interface contract.
- They issue at most one observation request and one policy inference call.
- They do not call
act_fn(...).
The public examples are fixed to these six paths:
examples/01_sync_inference.pyexamples/02_async_inference.pyexamples/03_data_collection.pyexamples/04_replay_collected_data.pyexamples/05_profile_inference_latency.pyexamples/06_async_inference_with_rtc.py
Together they show the intended scope of the system: one shared data interface, one outer loop, multiple inference-time use cases.
More detail lives in docs/plain_objects_guide.md
and docs/examples_guide.md.