Skip to content

API reference

Generated automatically from the source docstrings. The public, stability-guaranteed surface is everything exported by robolens.__all__ (eval, eval_set, read_eval_log, EvalLog and the other log dataclasses); the sections below document the full framework.

Core types & spaces

types

Core observation/action data types exchanged between policy and embodiment.

These are the wire format of a rollout. They are deliberately small, immutable, and NumPy-native. Arrays are raw (the policy owns model-specific preprocessing); images are (H, W, C) uint8.

The dataclasses set eq=False because they carry NumPy arrays, whose element-wise == does not yield a single bool — identity/round-trip semantics are what callers actually need here.

Observation dataclass

Observation(images: Mapping[str, ImageArray] = dict(), state: Mapping[str, StateArray] = dict(), instruction: str | None = None, image_times: Mapping[str, float] = dict(), state_time: float = 0.0, extra: Mapping[str, Any] = dict())

A single multi-modal observation produced by an embodiment.

images are keyed by camera name; state holds proprioception keyed by a controlled vocabulary (e.g. "eef_pos", "gripper"). instruction is the language goal for this step (usually constant across an episode, but may change for long-horizon tasks).

Action dataclass

Action(data: StateArray, meta: Mapping[str, Any] = dict())

A single action to apply to an embodiment.

Semantics (control mode, rotation representation, gripper kind, frame) live on the action space, not on every action instance — see robolens.spaces.

ActionChunk dataclass

ActionChunk(actions: Sequence[Action], control_hz: float | None = None, inference_latency_s: float | None = None, meta: Mapping[str, Any] = dict())

A horizon of actions predicted by one policy inference.

Modern VLAs (π0, ACT, diffusion policies) predict H future actions that are executed open-loop because inference is slower than the control rate. H == 1 is the degenerate "reactive policy" case. control_hz is the rate the chunk was intended to be played at (None defers to the embodiment's native rate); inference_latency_s, when measured, is logged.

StepResult dataclass

StepResult(observation: Observation, reward: float | None = None, terminated: bool = False, termination_reason: str | None = None, truncated: bool = False, info: Mapping[str, Any] = dict())

The outcome of applying one action to an embodiment.

terminated means the task ended (success or hard failure); termination_reason disambiguates (e.g. "success", "collision", "fault", "out_of_bounds"). truncated means a time/horizon cutoff. A simulator may expose privileged success via info.

spaces

Action/observation spaces and action semantics.

Spaces describe the shape of actions and observations; ActionSemantics describes what an action means (control mode, rotation representation, gripper kind, reference frame). Semantics are what make compatibility checking real (a 7-DoF VLA vs a 6-DoF arm; delta vs absolute poses) and make temporal ensembling correct.

This module ships a minimal-but-functional core for the tracer slice; richer validation and the full StateSpec vocabulary are layered on in a later step without changing these signatures.

ActionSemantics dataclass

ActionSemantics(control_mode: ControlMode, rotation_repr: RotationRepr = 'none', gripper: GripperKind = 'none', frame: Frame = 'base')

What an action vector means. Attached to an action Box.

Box dataclass

Box(shape: tuple[int, ...], low: NDArray[floating[Any]] | None = None, high: NDArray[floating[Any]] | None = None, semantics: ActionSemantics | None = None)

A continuous box-shaped space. Optional low/high bounds and, for action spaces, ActionSemantics.

CameraSpec dataclass

CameraSpec(name: str, height: int, width: int, channels: int = 3)

An image stream an embodiment provides or a policy requires.

StateField dataclass

StateField(key: str, shape: tuple[int, ...], unit: str = '', dtype: str = 'float64')

One proprioception field: its key, shape, unit, and dtype.

StateSpec dataclass

StateSpec(fields: tuple[StateField, ...] = ())

A richer description of an embodiment's proprioception than a bare key set.

ObservationSpace dataclass

ObservationSpace(cameras: tuple[CameraSpec, ...] = (), state_keys: frozenset[str] = frozenset(), state: StateSpec | None = None)

The observations an embodiment provides / a policy requires.

state_keys is the compatibility-relevant set of proprioception keys. state optionally carries the richer StateSpec (shapes/units).

Policy & embodiment

policy

The Policy (VLA) interface — one of RoboLens's two swappable inputs.

A Policy is the "brain": given an Observation (plus the scene's instruction), it returns an ActionChunk to be executed open-loop.

The public contract is a runtime-checkable Policy Protocol so callers can wrap existing models without inheriting. PolicyBase is an optional convenience ABC with sane defaults.

PolicyConfig dataclass

PolicyConfig(action_horizon: int = 1, replan_interval: int | None = None, temperature: float | None = None)

Inference-time configuration, recorded in the eval log.

The VLA analog of Inspect's GenerateConfig: action-chunk handling and sampling knobs that affect reproducibility.

PolicyInfo dataclass

PolicyInfo(name: str, action_space: Box, observation_space: ObservationSpace = ObservationSpace(), control_hz: float | None = None)

Static description of a policy used for compatibility checking + logging.

Policy

Bases: Protocol

The VLA contract.

PolicyBase

Bases: ABC

Optional base class providing defaults; inherit only for the helpers.

reset

reset(scene: Scene) -> None

Default: stateless policies need no per-scene reset.

Source code in src/robolens/policy.py
def reset(self, scene: Scene) -> None:  # noqa: B027 - intentional no-op default
    """Default: stateless policies need no per-scene reset."""

embodiment

The Embodiment interface — RoboLens's second swappable input.

An Embodiment is the "body + world": a real robot or a simulator. It produces observations, executes actions, and owns the action/observation spaces, the native control rate, and reset/safety machinery.

Designed around real-robot reality: reset may drive to a home pose and block on human confirmation; there is no guaranteed privileged success oracle. Simulators are a stricter special case that opt into extra capabilities.

Per R1 (see the design doc): step() returns as soon as the command is issued and does NOT block for the control period — the framework owns pacing — unless the embodiment declares the "self_paced" capability.

EmbodimentInfo dataclass

EmbodimentInfo(name: str, action_space: Box, observation_space: ObservationSpace, control_hz: float | None = None, is_simulated: bool = False, capabilities: frozenset[Capability] = frozenset(), supported_setups: frozenset[str] = frozenset(), supported_target_kinds: frozenset[str] = frozenset())

Static description of an embodiment for compatibility checking + logging.

Embodiment

Bases: Protocol

The robot/simulator contract.

EmbodimentBase

Bases: ABC

Optional base class with a no-op close; inherit for the convenience.

close

close() -> None

Default: nothing to release.

Source code in src/robolens/embodiment.py
def close(self) -> None:  # noqa: B027 - intentional no-op default
    """Default: nothing to release."""

Tasks & scenes

scene

Scenes — the robotics analog of Inspect AI's Sample.

A Scene is one initial condition of a benchmark: a language instruction, an optional success Target, an optional seed, and metadata. A benchmark Task iterates over a dataset of scenes (e.g. 50 object layouts), repeated epochs times.

Field mapping to Inspect: Sample(input, target, id, metadata, setup)Scene(instruction, target, id, metadata, setup, init_seed).

Target dataclass

Target(kind: str, spec: Mapping[str, Any] = dict())

A success specification the scorer reads. Embodiment-namespaced.

kind names what the embodiment must realize/evaluate (e.g. "reach_object"); spec carries the parameters. Kept intentionally open for the tracer; richer typed targets land with the scorer milestone.

Scene dataclass

Scene(id: str, instruction: str, target: Target | None = None, init_seed: int | None = None, setup: str | None = None, metadata: Mapping[str, Any] = dict())

One initial condition of a benchmark.

ListSceneDataset

ListSceneDataset(scenes: Sequence[Scene])

A trivial in-memory scene dataset backed by a sequence.

Source code in src/robolens/scene.py
def __init__(self, scenes: Sequence[Scene]):
    self._scenes = list(scenes)

task

The Task — an embodiment-agnostic benchmark definition.

Mirrors Inspect AI's Task = dataset + scorer + epochs/reducer, adapted for robotics: the dataset is a sequence of Scene initial conditions and the rollout horizon (max_steps) and control rate live here.

Epochs dataclass

Epochs(count: int = 1, reducer: str = 'mean')

Repeat count plus the reducer used to combine per-epoch scores.

Mirrors Inspect's Epochs(count, reducer); reducer is a registered name (default "mean").

Task dataclass

Task(name: str, scenes: Sequence[Scene], scorer: Scorer | Sequence[Scorer], max_steps: int, epochs: int | Epochs = 1, control_hz: float | None = None, metadata: Mapping[str, Any] = dict())

A benchmark: scenes + scorer(s) + horizon, independent of any embodiment.

Scoring

scorer

Scoring: Scores, the Scorer protocol, epoch reducers, and builtin scorers.

Mirrors Inspect AI's @scorer/reducer split. A scorer maps a recorded trajectory (+ the scene's Target) to a Score; an epoch reducer collapses the per-epoch scores of one scene into a single score before metrics aggregate across scenes.

Scorers consume the recorded trajectory (not a live environment), so scoring is reproducible from a saved log.

Score dataclass

Score(value: ScoreValue, explanation: str | None = None, metadata: Mapping[str, Any] = dict())

The outcome a scorer assigns to one trajectory.

Scorer

Bases: Protocol

Maps a recorded trajectory + scene target to a Score.

VLMScorer

Reserved interface (R10): score from a VLM classifier over final frames.

Implemented in a later milestone; instantiating and calling it raises so the contract is visible but no half-baked behavior ships.

value_to_float

value_to_float(value: ScoreValue) -> float

Coerce a score value to a float for metric aggregation.

Source code in src/robolens/scorer.py
def value_to_float(value: ScoreValue) -> float:
    """Coerce a score value to a float for metric aggregation."""
    if isinstance(value, bool):
        return 1.0 if value else 0.0
    if isinstance(value, int | float):
        return float(value)
    try:
        return float(value)
    except ValueError:
        return 0.0

reduce_mode

reduce_mode(scores: Sequence[Score]) -> Score

Most common raw value (works for categorical scores). Deterministic.

Source code in src/robolens/scorer.py
def reduce_mode(scores: Sequence[Score]) -> Score:
    """Most common raw value (works for categorical scores). Deterministic."""
    values = [s.value for s in scores]
    counts = Counter(values)
    best = max(values, key=lambda v: (counts[v], str(v)))
    return Score(value=best)

pass_at_k

pass_at_k(k: int) -> Reducer

Unbiased pass@k estimator over the epoch scores (success = value >= 0.5).

Source code in src/robolens/scorer.py
def pass_at_k(k: int) -> Reducer:
    """Unbiased pass@k estimator over the epoch scores (success = value >= 0.5)."""
    if k < 1:
        raise ValueError("k must be >= 1")

    def reducer(scores: Sequence[Score]) -> Score:
        n = len(scores)
        c = sum(1 for s in scores if _numeric(s.value) >= 0.5)
        if k > n:
            raise ValueError(f"pass_at_{k} needs at least {k} epochs, got {n}")
        # 1 - C(n-c, k) / C(n, k): probability >=1 of k draws is correct.
        value = 1.0 - (comb(n - c, k) / comb(n, k) if n - c >= k else 0.0)
        return Score(value=value)

    return reducer

success_at_end

success_at_end() -> Scorer

Score 1.0 iff the episode terminated with reason "success".

Source code in src/robolens/scorer.py
def success_at_end() -> Scorer:
    """Score 1.0 iff the episode terminated with reason ``"success"``."""
    return _SuccessAtEnd()

episode_length

episode_length() -> Scorer

Score = number of environment steps taken.

Source code in src/robolens/scorer.py
def episode_length() -> Scorer:
    """Score = number of environment steps taken."""
    return _EpisodeLength()

min_distance_to_goal

min_distance_to_goal() -> Scorer

Score = the closest the effector got to the goal (lower is better).

Source code in src/robolens/scorer.py
def min_distance_to_goal() -> Scorer:
    """Score = the closest the effector got to the goal (lower is better)."""
    return _MinDistanceToGoal()

reached_goal_state

reached_goal_state(threshold: float = 0.05) -> Scorer

Success iff the effector came within threshold of the goal.

Source code in src/robolens/scorer.py
def reached_goal_state(threshold: float = 0.05) -> Scorer:
    """Success iff the effector came within ``threshold`` of the goal."""
    return _ReachedGoalState(threshold=threshold)

operator_scorer

operator_scorer() -> Scorer

Score from the human operator's recorded success judgement (R6).

Source code in src/robolens/scorer.py
def operator_scorer() -> Scorer:
    """Score from the human operator's recorded success judgement (R6)."""
    return _OperatorScorer()

Rollout, controllers & safety

rollout

The rollout engine — the closed control loop at the heart of RoboLens.

One rollout runs a single trial (one scene, one epoch): it drives the policy↔embodiment loop through the Controller (open-loop chunk execution) and the Approver safety gate, logging each step to the sinks, and returns an immutable TrialRecord that scorers consume.

StepRecord dataclass

StepRecord(t: int, observation: Observation, action: Action, result: StepResult, image_refs: Mapping[str, FrameRef] | None = None)

One step of a recorded trajectory.

When a FrameStore is used, observation has its images stripped and image_refs holds on-disk handles instead (R5).

TrialRecord dataclass

TrialRecord(scene_id: str, epoch: int, seed: int | None, steps: list[StepRecord] = list(), terminated: bool = False, truncated: bool = False, termination_reason: str | None = None, status: str = 'success', error: str | None = None, inference_latencies: list[float] = list(), operator_judgement: str | None = None, events: list[Event] = list())

The full record of one trial — the unit scorers consume.

derive_seed

derive_seed(eval_seed: int | None, scene_seed: int | None, epoch: int) -> int

Deterministically combine eval/scene seeds and the epoch index (R2).

Distinct epochs of the same scene get distinct seeds so repeats actually vary for stochastic policies, while a fixed (eval_seed, scene_seed, epoch) reproduces bitwise.

Source code in src/robolens/rollout.py
def derive_seed(eval_seed: int | None, scene_seed: int | None, epoch: int) -> int:
    """Deterministically combine eval/scene seeds and the epoch index (R2).

    Distinct epochs of the same scene get distinct seeds so repeats actually vary
    for stochastic policies, while a fixed ``(eval_seed, scene_seed, epoch)``
    reproduces bitwise.
    """
    payload = f"{eval_seed or 0}:{scene_seed or 0}:{epoch}".encode()
    return zlib.crc32(payload) & 0xFFFFFFFF

rollout

rollout(policy: Policy, embodiment: Embodiment, scene: Scene, *, max_steps: int, seed: int | None, epoch: int, controller: Controller, approver: Approver, sink: LogSink, control_hz: float | None = None, frame_store: FrameStore | None = None) -> TrialRecord

Run a single trial and return its record.

Generic exceptions raised by the policy are wrapped as PolicyError; by the embodiment as EmbodimentFault. Already-typed RoboLens errors (incl. SafetyAbort) propagate unchanged, so the eval orchestrator can apply the correct continue-vs-halt policy.

Source code in src/robolens/rollout.py
def rollout(
    policy: Policy,
    embodiment: Embodiment,
    scene: Scene,
    *,
    max_steps: int,
    seed: int | None,
    epoch: int,
    controller: Controller,
    approver: Approver,
    sink: LogSink,
    control_hz: float | None = None,
    frame_store: FrameStore | None = None,
) -> TrialRecord:
    """Run a single trial and return its record.

    Generic exceptions raised by the policy are wrapped as
    [`PolicyError`][robolens.errors.PolicyError]; by the embodiment as
    [`EmbodimentFault`][robolens.errors.EmbodimentFault]. Already-typed RoboLens errors
    (incl. [`SafetyAbort`][robolens.errors.SafetyAbort]) propagate unchanged, so the
    eval orchestrator can apply the correct continue-vs-halt policy.
    """
    trial_id = f"{scene.id}-e{epoch}"
    record = TrialRecord(scene_id=scene.id, epoch=epoch, seed=seed)
    record.events.append(reset_event(seed))
    store: dict[str, Any] = {}

    policy.reset(scene)
    obs = embodiment.reset(scene, seed=seed)

    t = 0
    while t < max_steps:
        prev_inferences = len(store.get("_controller_inferences", []))
        try:
            action = controller.next_action(policy, obs, t, store)
        except RoboLensError:
            raise
        except Exception as exc:
            record.events.append(error_event(t, "PolicyError", str(exc)))
            raise PolicyError(str(exc)) from exc

        inferences = store.get("_controller_inferences", [])
        if len(inferences) > prev_inferences:
            latency, chunk_len = inferences[-1]
            record.events.append(inference_event(t, latency, chunk_len))

        action = approver.review(action, store)  # may raise SafetyAbort

        try:
            result: StepResult = embodiment.step(action)
        except RoboLensError:
            raise
        except Exception as exc:
            record.events.append(error_event(t, "EmbodimentFault", str(exc)))
            raise EmbodimentFault(str(exc)) from exc

        sink.log_step(t, obs, action, result)
        obs_rec, refs = _store_frames(frame_store, trial_id, t, obs)
        record.steps.append(
            StepRecord(t=t, observation=obs_rec, action=action, result=result, image_refs=refs)
        )
        record.events.append(
            step_event(t, result.terminated, result.truncated, result.termination_reason)
        )
        t += 1

        if result.terminated:
            record.terminated = True
            record.termination_reason = result.termination_reason
            break
        if result.truncated:
            record.truncated = True
            record.termination_reason = result.termination_reason or "truncated"
            break
        obs = result.observation
    else:
        record.truncated = True
        record.termination_reason = "max_steps"

    record.inference_latencies = [
        lat for lat, _ in store.get("_controller_inferences", []) if lat is not None
    ]
    # ``control_hz`` / SELF_PACED are wired here; real-time pacing (sleep) is added
    # with a real-robot adapter so the test suite stays fast.
    _ = _effective_control_hz(None, control_hz, embodiment.info.control_hz)
    _ = SELF_PACED
    return record

controller

Controllers — the rollout middleware layer (Inspect's @solver analog).

A Controller owns the per-control-step decision of which action to send to the embodiment. It internally decides when to call policy.act() (a slow VLA inference returning an ActionChunk), buffers the returned chunk, and pops the next action each step. This single-method, stateful shape (R3) is what lets advanced controllers — e.g. a temporal-ensembling controller that re-infers every step and blends overlapping predictions — compose without forking the rollout loop.

DefaultController plays the first replan_interval actions of each chunk, then re-infers (replan_interval=None ⇒ play the whole chunk before replanning).

Controller

Bases: Protocol

Decides the next action to execute, calling the policy as needed.

DefaultController

DefaultController(replan_interval: int | None = None)

Open-loop chunk execution with periodic replanning.

Source code in src/robolens/controller.py
def __init__(self, replan_interval: int | None = None):
    if replan_interval is not None and replan_interval < 1:
        raise ValueError("replan_interval must be >= 1 or None")
    self.replan_interval = replan_interval

SmoothingController

SmoothingController(inner: Controller, alpha: float = 0.5)

Wrap another controller and exponentially smooth its action stream.

Demonstrates the middleware composition the single-method interface enables: the wrapped controller owns inference/replanning while this layer applies an exponential moving average (alpha toward the new action) on top. Only valid for additive/continuous action spaces (the caller's responsibility).

Source code in src/robolens/controller.py
def __init__(self, inner: Controller, alpha: float = 0.5):
    if not 0.0 < alpha <= 1.0:
        raise ValueError("alpha must be in (0, 1]")
    self.inner = inner
    self.alpha = alpha

EnsemblingController

EnsemblingController(action_space: Box, m: float = 0.1)

ACT/ALOHA-style temporal ensembling over overlapping action chunks.

Queries the policy every control step and blends, for the current step, the predictions of all still-relevant recent chunks. A chunk queried at global step q predicts step t via its action at index t - q (valid while 0 <= t - q < len(chunk)). Predictions are weighted exp(-m * i) with i = 0 for the oldest contributing chunk (ALOHA's convention: older predictions dominate, which smooths motion); larger m favors the oldest.

Only valid for additive action representations: the constructor refuses rotation reps and binary grippers that cannot be linearly averaged (R8).

Source code in src/robolens/controller.py
def __init__(self, action_space: Box, m: float = 0.1):
    if m < 0:
        raise ValueError("m must be >= 0")
    self.action_space = action_space
    self.m = m
    sem = action_space.semantics
    if sem is None:
        global _ENSEMBLE_WARNED
        if not _ENSEMBLE_WARNED:
            warnings.warn(
                "EnsemblingController: action space has no semantics; cannot "
                "verify that actions are safe to average.",
                RuntimeWarning,
                stacklevel=2,
            )
            _ENSEMBLE_WARNED = True
    else:
        if sem.control_mode not in _AVERAGEABLE_MODES:  # pragma: no cover
            # Defensive: every valid ControlMode literal is currently averageable.
            raise ValueError(
                f"EnsemblingController cannot average control_mode {sem.control_mode!r}"
            )
        if sem.rotation_repr not in _AVERAGEABLE_ROT:
            raise ValueError(
                f"EnsemblingController cannot linearly average rotation_repr "
                f"{sem.rotation_repr!r}; only {sorted(_AVERAGEABLE_ROT)} are safe"
            )
        if sem.gripper == "binary":
            raise ValueError(
                "EnsemblingController cannot average a binary gripper; threshold "
                "it downstream or use a continuous gripper"
            )

approver

The Approver — a safety gate between policy output and the embodiment.

Every action passes through Approver.review before embodiment.step. This is the robotics analog of Inspect AI's ApprovalPolicy and is more safety-critical: an approver may pass, clamp, or veto an action (a veto raises SafetyAbort). In the tracer slice the default approver passes everything through; clamping/operator approval land in rollout hardening.

Approver

Bases: Protocol

Reviews an action before it reaches the embodiment.

May return the action unchanged, return a modified (e.g. clamped) action, or raise SafetyAbort to halt the eval.

AutoApprover

Approve every action unchanged (the permissive default).

ClampApprover

ClampApprover(action_space: Box)

Clamp actions to a box's low/high bounds before they reach hardware.

A modified action is flagged via action.meta["clamped"] so the rollout can record an approval event.

Source code in src/robolens/approver.py
def __init__(self, action_space: Box):
    self._space = action_space

frames

FrameStore — rollout-owned streaming of camera frames to disk (R5).

A long multi-camera episode would exhaust memory if every frame were retained in the TrialRecord. Instead the rollout streams frames to disk through a FrameStore and keeps only lightweight FrameRef handles. This is owned by the rollout, NOT by any log sink, so trajectories are recorded (and scorable) independent of which optional sinks are enabled.

FrameRef dataclass

FrameRef(camera: str, t: int, path: str)

A handle to a camera frame stored on disk.

FrameStore

FrameStore(root: str)

Persist frames as .npy files under root and hand back refs.

Source code in src/robolens/frames.py
def __init__(self, root: str):
    self.root = Path(root)
    self.root.mkdir(parents=True, exist_ok=True)
    self.count = 0

transcript

A typed transcript of rollout events.

Each trial records an ordered stream of events (reset, inference, step, approval, operator judgement, error). This is the robotics analog of Inspect AI's transcript and is the data a results viewer renders. Events are deliberately lightweight: a kind, the step index t (-1 for pre-loop events), and a small data payload.

Event dataclass

Event(kind: EventKind, t: int, data: Mapping[str, Any] = dict())

One entry in a trial's transcript.

Compatibility & errors

compat

Compatibility checking between a policy and an embodiment.

Before any rollout, RoboLens verifies that a (policy, embodiment) pair can actually run together: the action spaces agree in dimension and semantics, the embodiment provides every observation the policy requires (resolving a name remap), the control rates are reconcilable (R1), and — given a task — every scene is realizable on the embodiment (R7).

Hard mismatches are error issues that fail fast; soft ones are warnings.

CompatibilityReport dataclass

CompatibilityReport(issues: list[CompatIssue] = list(), remap: dict[str, str] = dict())

The outcome of a compatibility check.

check_compatibility

check_compatibility(policy: Policy, embodiment: Embodiment, task: Task | None = None, *, remap: dict[str, str] | None = None) -> CompatibilityReport

Return a structured compatibility report (does not raise).

Source code in src/robolens/compat.py
def check_compatibility(
    policy: Policy,
    embodiment: Embodiment,
    task: Task | None = None,
    *,
    remap: dict[str, str] | None = None,
) -> CompatibilityReport:
    """Return a structured compatibility report (does not raise)."""
    remap = dict(remap or {})
    report = CompatibilityReport(remap=remap)
    issues = report.issues

    _check_action_spaces(policy.info.action_space, embodiment.info.action_space, issues)

    pobs = policy.info.observation_space
    eobs = embodiment.info.observation_space
    _resolve_keys(pobs.camera_names, eobs.camera_names, remap, "camera", issues)
    _resolve_keys(pobs.state_keys, eobs.state_keys, remap, "state", issues)

    # Control-rate reconciliation (R1): only warn, since the framework paces.
    p_hz = getattr(policy.info, "control_hz", None)
    e_hz = embodiment.info.control_hz
    if p_hz is not None and e_hz is not None and abs(p_hz - e_hz) > _RATE_TOL:
        issues.append(
            CompatIssue(
                "warning",
                "control_rate",
                f"policy desires {p_hz} Hz but embodiment runs at {e_hz} Hz; "
                "framework will pace to the effective rate",
            )
        )

    if task is not None:
        _check_scenes_realizable(task, embodiment, issues)

    return report

assert_compatible

assert_compatible(policy: Policy, embodiment: Embodiment, task: Task | None = None, *, remap: dict[str, str] | None = None) -> CompatibilityReport

Check compatibility and raise CompatibilityError on hard errors.

Source code in src/robolens/compat.py
def assert_compatible(
    policy: Policy,
    embodiment: Embodiment,
    task: Task | None = None,
    *,
    remap: dict[str, str] | None = None,
) -> CompatibilityReport:
    """Check compatibility and raise [`CompatibilityError`][robolens.errors.CompatibilityError] on
    hard errors."""
    report = check_compatibility(policy, embodiment, task, remap=remap)
    report.raise_for_errors()
    return report

errors

RoboLens error taxonomy.

The split below resolves the "fail fast vs never-crash-overnight" tension:

  • ConfigError / CompatibilityError are raised before any rollout — bad configuration should fail loudly and immediately.
  • PolicyError is recorded as a failed trial; whether it aborts the eval is governed by fail_on_error (Inspect semantics).
  • EmbodimentFault and SafetyAbort always halt the eval regardless of fail_on_error — a faulted or unsafe robot must never auto-advance to the next scene unattended.

RoboLensError

Bases: Exception

Base class for all RoboLens errors.

ConfigError

Bases: RoboLensError

Invalid task / policy / embodiment configuration. Fail fast.

CompatibilityError

Bases: RoboLensError

A policy and embodiment are not compatible. Fail fast, before any rollout.

PolicyError

Bases: RoboLensError

The policy raised during inference. Recorded as a failed trial.

EmbodimentFault

Bases: RoboLensError

The embodiment/robot faulted. Always halts the eval and requires a human.

SafetyAbort

Bases: RoboLensError

An approver vetoed an action / e-stop. Always halts the eval.

Evaluation & logs

eval

The eval() entry point — orchestrates scenes x epochs into an EvalLog.

Mirrors Inspect AI's eval(): it runs a task's scenes (repeated over epochs), scores each recorded trajectory, reduces epochs, aggregates metrics, and returns a list of immutable EvalLog (one per task). The tracer slice accepts already-constructed objects; registry-string resolution (policy="openvla/7b") is layered on with the registry milestone.

eval

eval(task: Task | str, policy: Policy | str, embodiment: Embodiment | str, *, log_dir: str = 'logs', sinks: list[LogSink] | None = None, seed: int | None = 0, fail_on_error: bool | float = False, controller: Controller | None = None, approver: Approver | None = None, remap: dict[str, str] | None = None, store_frames: bool = False) -> list[EvalLog]

Run task with policy on embodiment; return [EvalLog].

task/policy/embodiment may be objects or registry names (e.g. policy="scripted"), resolved through the registry — the Inspect-style ergonomic that keeps logs and the CLI reproducible.

fail_on_error follows Inspect semantics for PolicyError (True = fail on first, False = never, 0<x<1 = proportion, x>1 = count). EmbodimentFault/SafetyAbort always halt regardless.

When store_frames is set, camera frames are streamed to <log_dir>/frames as binary side-cars (R5) rather than kept in memory.

Raises CompatibilityError (fail fast, before any rollout) if the policy and embodiment are incompatible.

Source code in src/robolens/eval.py
def eval(
    task: Task | str,
    policy: Policy | str,
    embodiment: Embodiment | str,
    *,
    log_dir: str = "logs",
    sinks: list[LogSink] | None = None,
    seed: int | None = 0,
    fail_on_error: bool | float = False,
    controller: Controller | None = None,
    approver: Approver | None = None,
    remap: dict[str, str] | None = None,
    store_frames: bool = False,
) -> list[EvalLog]:
    """Run ``task`` with ``policy`` on ``embodiment``; return ``[EvalLog]``.

    ``task``/``policy``/``embodiment`` may be objects or **registry names**
    (e.g. ``policy="scripted"``), resolved through the registry — the Inspect-style
    ergonomic that keeps logs and the CLI reproducible.

    ``fail_on_error`` follows Inspect semantics for ``PolicyError`` (``True`` =
    fail on first, ``False`` = never, ``0<x<1`` = proportion, ``x>1`` = count).
    ``EmbodimentFault``/``SafetyAbort`` always halt regardless.

    When ``store_frames`` is set, camera frames are streamed to
    ``<log_dir>/frames`` as binary side-cars (R5) rather than kept in memory.

    Raises [`CompatibilityError`][robolens.errors.CompatibilityError] (fail fast, before any
    rollout) if the policy and embodiment are incompatible.
    """
    from robolens.logging.json_log import JsonLogSink
    from robolens.registry import resolve

    task = cast(Task, resolve("task", task)) if isinstance(task, str) else task
    policy = cast(Policy, resolve("policy", policy)) if isinstance(policy, str) else policy
    embodiment = (
        cast(Embodiment, resolve("embodiment", embodiment))
        if isinstance(embodiment, str)
        else embodiment
    )

    # Fail fast on incompatible pairings before touching any hardware/sim.
    assert_compatible(policy, embodiment, task, remap=remap)

    sink_list: list[LogSink] = sinks if sinks is not None else [JsonLogSink(log_dir)]
    bus = _Broadcast(sink_list)
    controller = controller or DefaultController(policy.config.replan_interval)
    approver = approver or AutoApprover()

    frame_store: FrameStore | None = None
    if store_frames:
        frame_store = FrameStore(str(Path(log_dir) / "frames"))

    spec = EvalSpec(
        task=task.name,
        policy=policy.info.name,
        embodiment=embodiment.info.name,
        created=_now_iso(),
        robolens_version=__version__,
        git_commit=_git_commit(),
        policy_config=asdict(policy.config),
        embodiment_info={
            "control_hz": embodiment.info.control_hz,
            "is_simulated": embodiment.info.is_simulated,
            "capabilities": sorted(embodiment.info.capabilities),
        },
        seed=seed,
    )
    bus.on_eval_start(spec)

    started = time.perf_counter()
    started_iso = _now_iso()
    epoch_spec = task.epoch_spec
    scorers = task.scorers

    scene_results: list[SceneResult] = []
    all_latencies: list[float] = []
    total_steps = 0
    total_trials = 0
    status = "success"
    error: str | None = None
    error_count = 0

    halted = False
    for scene in task.scenes:
        per_scorer_scores: dict[str, list[Score]] = {s.name: [] for s in scorers}
        epoch_dicts: list[dict[str, float]] = []
        scene_status = "success"
        scene_error: str | None = None

        for epoch in range(epoch_spec.count):
            trial_seed = derive_seed(seed, scene.init_seed, epoch)
            bus.on_trial_start(scene.id, epoch)
            try:
                record = rollout(
                    policy,
                    embodiment,
                    scene,
                    max_steps=task.max_steps,
                    seed=trial_seed,
                    epoch=epoch,
                    controller=controller,
                    approver=approver,
                    sink=bus,
                    control_hz=task.control_hz,
                    frame_store=frame_store,
                )
            except (EmbodimentFault, SafetyAbort) as exc:
                # Hardware/safety failures always halt the whole eval.
                status = "error"
                error = f"{type(exc).__name__}: {exc}"
                scene_status = "error"
                scene_error = error
                halted = True
                break
            except PolicyError as exc:
                error_count += 1
                scene_status = "error"
                scene_error = f"{type(exc).__name__}: {exc}"
                record = TrialRecord(
                    scene_id=scene.id,
                    epoch=epoch,
                    seed=trial_seed,
                    status="error",
                    error=scene_error,
                )

            total_trials += 1
            total_steps += len(record.steps)
            all_latencies.extend(record.inference_latencies)

            epoch_values: dict[str, float] = {}
            for scorer in scorers:
                score = scorer(record, scene.target)
                per_scorer_scores[scorer.name].append(score)
                epoch_values[scorer.name] = value_to_float(score.value)
            epoch_dicts.append(epoch_values)
            bus.on_trial_end(record)

        reduced = {
            name: value_to_float(reduce_scores(epoch_spec.reducer, scores).value)
            for name, scores in per_scorer_scores.items()
            if scores
        }
        scene_results.append(
            SceneResult(
                scene_id=scene.id,
                status=scene_status,
                reduced=reduced,
                epochs=epoch_dicts,
                error=scene_error,
            )
        )
        if halted or _should_fail(fail_on_error, error_count, total_trials):
            if not halted:
                status = "error"
                error = error or f"fail_on_error threshold exceeded ({error_count} errors)"
            break

    metrics: dict[str, float] = {}
    for scorer in scorers:
        vals = [sr.reduced[scorer.name] for sr in scene_results if scorer.name in sr.reduced]
        if vals:
            metrics[scorer.name] = mean(vals)

    stats = EvalStats(
        started_at=started_iso,
        completed_at=_now_iso(),
        duration_s=time.perf_counter() - started,
        total_steps=total_steps,
        mean_inference_latency_s=(mean(all_latencies) if all_latencies else None),
        frames_dir=str(frame_store.root) if frame_store is not None else None,
    )
    log = EvalLog(
        version=EvalLog.SCHEMA_VERSION,
        status=status,
        eval=spec,
        results=EvalResults(
            total_scenes=len(scene_results),
            total_trials=total_trials,
            metrics=metrics,
        ),
        stats=stats,
        samples=scene_results,
        error=error,
    )
    bus.on_eval_end(log)
    return [log]

eval_set

eval_set(tasks: Task | str | Sequence[Task | str], policy: Policy | str, embodiment: Embodiment | str, *, log_dir: str = 'logs', seed: int | None = 0, fail_on_error: bool | float = False, controller: Controller | None = None, approver: Approver | None = None, remap: dict[str, str] | None = None, store_frames: bool = False, retry_attempts: int = 0) -> tuple[bool, list[EvalLog]]

Run a set of tasks and return (success, logs) (mirrors Inspect AI).

success is True iff every task's log has status == "success".

Resumption of a partially-completed run (skipping already-finished scenes via a stable run id) is reserved for a follow-up: retry_attempts is accepted now so callers don't get retrofitted, but is not yet honored.

Source code in src/robolens/eval.py
def eval_set(
    tasks: Task | str | Sequence[Task | str],
    policy: Policy | str,
    embodiment: Embodiment | str,
    *,
    log_dir: str = "logs",
    seed: int | None = 0,
    fail_on_error: bool | float = False,
    controller: Controller | None = None,
    approver: Approver | None = None,
    remap: dict[str, str] | None = None,
    store_frames: bool = False,
    retry_attempts: int = 0,
) -> tuple[bool, list[EvalLog]]:
    """Run a set of tasks and return ``(success, logs)`` (mirrors Inspect AI).

    ``success`` is ``True`` iff every task's log has ``status == "success"``.

    Resumption of a partially-completed run (skipping already-finished scenes via
    a stable run id) is reserved for a follow-up: ``retry_attempts`` is accepted
    now so callers don't get retrofitted, but is not yet honored.
    """
    task_list = [tasks] if isinstance(tasks, Task | str) else list(tasks)
    logs: list[EvalLog] = []
    for task in task_list:
        logs.extend(
            eval(
                task,
                policy,
                embodiment,
                log_dir=log_dir,
                seed=seed,
                fail_on_error=fail_on_error,
                controller=controller,
                approver=approver,
                remap=remap,
                store_frames=store_frames,
            )
        )
    success = all(log.status == "success" for log in logs)
    return success, logs

log

The immutable evaluation log — RoboLens's reproducible record of a run.

Mirrors Inspect AI's EvalLog: version + status + eval spec + results + stats + per-scene samples + error. Serialized to JSON with a schema version so newer RoboLens always reads older logs (a read-back guarantee enforced by golden tests in a later step).

EvalSpec dataclass

EvalSpec(task: str, policy: str, embodiment: str, created: str, robolens_version: str, git_commit: str | None = None, policy_config: dict[str, Any] = dict(), embodiment_info: dict[str, Any] = dict(), seed: int | None = None)

Top-level identity of an eval: what was run, with what, when.

EvalStats dataclass

EvalStats(started_at: str, completed_at: str, duration_s: float, total_steps: int, mean_inference_latency_s: float | None = None, frames_dir: str | None = None)

Timing and execution statistics for a run.

SceneResult dataclass

SceneResult(scene_id: str, status: str, reduced: dict[str, float] = dict(), epochs: list[dict[str, float]] = list(), error: str | None = None)

Per-scene result: the reduced score(s) plus the raw per-epoch scores.

EvalResults dataclass

EvalResults(total_scenes: int, total_trials: int, metrics: dict[str, float] = dict())

Aggregate results across all scenes.

EvalLog dataclass

EvalLog(version: int, status: str, eval: EvalSpec, results: EvalResults, stats: EvalStats, samples: list[SceneResult] = list(), error: str | None = None)

The full record returned by eval and persisted to disk.

read_eval_log

read_eval_log(path: str) -> EvalLog

Read an EvalLog back from a JSON file on disk.

Source code in src/robolens/log.py
def read_eval_log(path: str) -> EvalLog:
    """Read an [`EvalLog`][robolens.log.EvalLog] back from a JSON file on disk."""
    with Path(path).open(encoding="utf-8") as fh:
        return EvalLog.from_dict(json.load(fh))

Logging sinks

sink

The LogSink protocol and a no-op base implementation.

A sink observes a run's lifecycle. The rollout engine and eval() call these hooks in a fixed order: on_eval_start → (per trial: on_trial_startlog_step* → on_trial_end) → on_eval_end.

LogSink

Bases: Protocol

Observes the lifecycle of an evaluation run.

NullSink

A sink that does nothing — a convenient base for partial implementations.

json_log

The canonical JSON eval-log sink.

Writes the immutable EvalLog to log_dir once the run finishes. The write is atomic (temp file + os.replace) so an interrupted overnight run never leaves a half-written log.

JsonLogSink

JsonLogSink(log_dir: str)

Persist the final EvalLog as JSON. Step events are counted only.

Source code in src/robolens/logging/json_log.py
def __init__(self, log_dir: str):
    self.log_dir = Path(log_dir)
    self.path: Path | None = None
    self._steps = 0

rerun_sink

Optional Rerun visualization sink.

Logs camera images, proprioception, action vectors, and success markers to a Rerun <https://github.com/rerun-io/rerun>_ recording. rerun-sdk is imported lazily inside methods so the core package never depends on it; if it is not installed, the sink warns once and becomes a no-op (so unattended runs and the core-only import gate are unaffected).

Install with pip install "robolens[rerun]".

RerunSink

RerunSink(recording_path: str | None = None, *, application_id: str = 'robolens', spawn: bool = False)

Stream a rollout to a Rerun recording (.rrd) or a live viewer.

Source code in src/robolens/logging/rerun_sink.py
def __init__(
    self,
    recording_path: str | None = None,
    *,
    application_id: str = "robolens",
    spawn: bool = False,
):
    self.recording_path = recording_path
    self.application_id = application_id
    self.spawn = spawn
    self._rr: Any | None = None
    self._t = 0

Registry & CLI

registry

Registry and decorators for tasks, policies, embodiments, scorers, and sinks.

Mirrors Inspect AI's extension model: components register by name via decorators and are resolved from strings (so eval(policy="scripted") and the CLI work). Out-of-tree packages publish components through importlib.metadata entry-point groups, so an installed robolens-openvla appears in robolens list without being imported first.

Entry-point groups: robolens.tasks, robolens.policies, robolens.embodiments, robolens.scorers, robolens.sinks.

register

register(kind: Kind, name: str | None = None) -> Callable[[F], F]

Register a factory under kind/name (defaults to its __name__).

Source code in src/robolens/registry.py
def register(kind: Kind, name: str | None = None) -> Callable[[F], F]:
    """Register a factory under ``kind``/``name`` (defaults to its ``__name__``)."""
    if kind not in _FACTORIES:
        raise ValueError(f"unknown registry kind {kind!r}; valid: {KINDS}")

    def decorator(factory: F) -> F:
        key = name or getattr(factory, "__name__", None)
        if key is None:
            raise ValueError("cannot determine a registry name for the factory")
        _FACTORIES[kind][key] = factory
        return factory

    return decorator

task

task(name: str | None = None) -> Callable[[F], F]

Decorator: register a task factory under name.

Source code in src/robolens/registry.py
def task(name: str | None = None) -> Callable[[F], F]:
    """Decorator: register a task factory under ``name``."""
    return register("task", name)

policy

policy(name: str | None = None) -> Callable[[F], F]

Decorator: register a policy factory under name.

Source code in src/robolens/registry.py
def policy(name: str | None = None) -> Callable[[F], F]:
    """Decorator: register a policy factory under ``name``."""
    return register("policy", name)

embodiment

embodiment(name: str | None = None) -> Callable[[F], F]

Decorator: register an embodiment factory under name.

Source code in src/robolens/registry.py
def embodiment(name: str | None = None) -> Callable[[F], F]:
    """Decorator: register an embodiment factory under ``name``."""
    return register("embodiment", name)

scorer

scorer(name: str | None = None) -> Callable[[F], F]

Decorator: register a scorer factory under name.

Source code in src/robolens/registry.py
def scorer(name: str | None = None) -> Callable[[F], F]:
    """Decorator: register a scorer factory under ``name``."""
    return register("scorer", name)

sink

sink(name: str | None = None) -> Callable[[F], F]

Decorator: register a log-sink factory under name.

Source code in src/robolens/registry.py
def sink(name: str | None = None) -> Callable[[F], F]:
    """Decorator: register a log-sink factory under ``name``."""
    return register("sink", name)

registered

registered(kind: Kind) -> dict[str, Callable[..., Any]]

Return all registered factories for kind (builtins + plugins).

Source code in src/robolens/registry.py
def registered(kind: Kind) -> dict[str, Callable[..., Any]]:
    """Return all registered factories for ``kind`` (builtins + plugins)."""
    if kind not in _FACTORIES:
        raise ValueError(f"unknown registry kind {kind!r}; valid: {KINDS}")
    _ensure_loaded()
    return dict(_FACTORIES[kind])

resolve

resolve(kind: Kind, name: str, /, **kwargs: Any) -> Any

Construct a registered component by name with the given keyword args.

Source code in src/robolens/registry.py
def resolve(kind: Kind, name: str, /, **kwargs: Any) -> Any:
    """Construct a registered component by name with the given keyword args."""
    factories = registered(kind)
    if name not in factories:
        raise KeyError(f"no {kind} named {name!r}; available: {sorted(factories)}")
    return factories[name](**kwargs)

cli

The robolens command-line interface.

Subcommands:

  • robolens list [tasks|policies|embodiments|scorers|sinks] — show registered components (builtins + installed plugins).
  • robolens run --task T --policy P --embodiment E — run an eval, resolving components from the registry. Pass constructor args with -T/-P/-E k=v.

Mock world

cubepick

CubePick — a deterministic 2D toy world for exercising the full stack.

A point end-effector in the unit square must reach a cube. The action is a 2D end-effector position delta. Success is declared (and exposed as privileged info["success"]) when the effector is within goal_radius of the cube. Fully deterministic given a seed; no third-party dependencies.

CubePickEmbodiment

CubePickEmbodiment(*, max_step: float = 0.1, goal_radius: float = 0.05, start: tuple[float, float] = (0.1, 0.1))

A 2D reach-the-cube simulator.

Source code in src/robolens/mock/cubepick.py
def __init__(
    self,
    *,
    max_step: float = 0.1,
    goal_radius: float = 0.05,
    start: tuple[float, float] = (0.1, 0.1),
):
    from robolens.embodiment import (
        AUTO_RESET,
        PRIVILEGED_SUCCESS,
        RENDERABLE,
        RESETTABLE,
        SEEDABLE,
        EmbodimentInfo,
    )

    self.max_step = max_step
    self.goal_radius = goal_radius
    self.start = np.asarray(start, dtype=np.float64)
    self.num_steps = 0

    self._eef = self.start.copy()
    self._cube = np.array([0.8, 0.8], dtype=np.float64)

    self.info = EmbodimentInfo(
        name="cubepick",
        action_space=Box(
            shape=(2,),
            low=np.array([-max_step, -max_step]),
            high=np.array([max_step, max_step]),
            semantics=ActionSemantics(control_mode="eef_delta_pos", frame="world"),
        ),
        observation_space=ObservationSpace(
            cameras=(CameraSpec(name="top", height=_IMG, width=_IMG, channels=3),),
            state_keys=frozenset({"eef_pos", "cube_pos"}),
        ),
        control_hz=10.0,
        is_simulated=True,
        capabilities=frozenset(
            {SEEDABLE, RESETTABLE, AUTO_RESET, PRIVILEGED_SUCCESS, RENDERABLE}
        ),
    )

policies

Mock policies for the CubePick world.

  • ScriptedPolicy — a deterministic oracle that walks the effector to the cube. It predicts a full action chunk by simulating its own future motion, so the chunk is a genuine open-loop trajectory (H > 1).
  • RandomPolicy — emits random deltas; mostly fails.
  • NoopPolicy — emits zero actions; never succeeds.

ScriptedPolicy

ScriptedPolicy(*, chunk_size: int = 4, max_step: float = 0.1)

Deterministic oracle: walk straight to the cube, in chunks.

Source code in src/robolens/mock/policies.py
def __init__(self, *, chunk_size: int = 4, max_step: float = 0.1):
    self.chunk_size = chunk_size
    self.max_step = max_step
    self.num_inferences = 0
    self.info = PolicyInfo(
        name="scripted", action_space=_ACTION_SPACE, observation_space=_SCRIPTED_OBS
    )
    self.config = PolicyConfig(action_horizon=chunk_size)

RandomPolicy

RandomPolicy(*, chunk_size: int = 4, max_step: float = 0.1, seed: int = 0)

Emit random small deltas. Deterministic given the construction seed.

Source code in src/robolens/mock/policies.py
def __init__(self, *, chunk_size: int = 4, max_step: float = 0.1, seed: int = 0):
    self.chunk_size = chunk_size
    self.max_step = max_step
    self.num_inferences = 0
    self._rng = np.random.RandomState(seed)
    self._base_seed = seed
    self._reset_count = 0
    self.info = PolicyInfo(name="random", action_space=_ACTION_SPACE)
    self.config = PolicyConfig(action_horizon=chunk_size)

NoopPolicy

NoopPolicy(*, chunk_size: int = 1)

Emit zero actions; never moves.

Source code in src/robolens/mock/policies.py
def __init__(self, *, chunk_size: int = 1):
    self.chunk_size = chunk_size
    self.num_inferences = 0
    self.info = PolicyInfo(name="noop", action_space=_ACTION_SPACE)
    self.config = PolicyConfig(action_horizon=chunk_size)