from __future__ import annotations
from typing import Any
from dataclasses import dataclass, field
JSONDict = dict[str, Any]
def _validate_vector_shape(values: list[float], size: int, name: str) -> None:
if len(values) != size:
raise ValueError(f"{name} must have size {size}, got {len(values)}")
[docs]
@dataclass(slots=True)
class Twist:
"""Linear and angular velocity in world space.
This mirrors the velocity fields carried by XR controller samples and arm
commands.
Attributes
----------
linear
Linear velocity vector ``[vx, vy, vz]``.
angular
Angular velocity vector ``[wx, wy, wz]``.
"""
linear: list[float] = field(default_factory=lambda: [0.0, 0.0, 0.0])
angular: list[float] = field(default_factory=lambda: [0.0, 0.0, 0.0])
def __post_init__(self) -> None:
_validate_vector_shape(self.linear, 3, "linear")
_validate_vector_shape(self.angular, 3, "angular")
[docs]
def to_dict(self) -> JSONDict:
"""Serialize to a JSON-compatible dictionary."""
return {"linear": self.linear, "angular": self.angular}
[docs]
@classmethod
def from_dict(cls, data: JSONDict) -> "Twist":
"""Deserialize from JSON-compatible data."""
return cls(linear=list(data["linear"]), angular=list(data["angular"]))
[docs]
@dataclass(slots=True)
class ArmState:
"""Renderable state for one simulated or pass-through arm.
This is the primary arm payload published in ``SceneState.arms`` and
described in the *Publishing Arm State and Assets* guide.
Attributes
----------
arm_id
Stable identifier for the arm.
owner_user_id
User that owns this arm, or ``None`` when no owner is associated.
base
Base pose of the arm in world space.
tip
Tip pose of the arm in world space.
centerline
Polyline points describing the arm centerline from base to tip.
radii
Per-element or per-segment radii used by the client arm renderer.
element_lengths
Optional per-segment lengths. Useful for some client render modes.
directors
Optional row-wise 3x3 orientation frames along the arm.
contact_points
Optional world-space points for contact or debug visualization.
"""
arm_id: str
owner_user_id: str | None
base: Transform
tip: Transform
centerline: list[list[float]]
radii: list[float]
element_lengths: list[float] = field(default_factory=list)
directors: list[list[list[float]]] = field(default_factory=list)
contact_points: list[list[float]] = field(default_factory=list)
def __post_init__(self) -> None:
if not self.arm_id:
raise ValueError("arm_id cannot be empty")
for idx, point in enumerate(self.centerline):
_validate_vector_shape(point, 3, f"centerline[{idx}]")
for radius in self.radii:
if radius < 0:
raise ValueError("radii cannot contain negative values")
for length in self.element_lengths:
if length < 0:
raise ValueError(
"element_lengths cannot contain negative values"
)
for idx, director in enumerate(self.directors):
if len(director) != 3:
raise ValueError(f"directors[{idx}] must be 3x3")
for jdx, row in enumerate(director):
_validate_vector_shape(row, 3, f"directors[{idx}][{jdx}]")
for idx, point in enumerate(self.contact_points):
_validate_vector_shape(point, 3, f"contact_points[{idx}]")
[docs]
def to_dict(self) -> JSONDict:
"""Serialize to a JSON-compatible dictionary."""
return {
"arm_id": self.arm_id,
"owner_user_id": self.owner_user_id,
"base": self.base.to_dict(),
"tip": self.tip.to_dict(),
"centerline": self.centerline,
"radii": self.radii,
"element_lengths": self.element_lengths,
"directors": self.directors,
"contact_points": self.contact_points,
}
[docs]
@classmethod
def from_dict(cls, data: JSONDict) -> "ArmState":
"""Deserialize from JSON-compatible data."""
return cls(
arm_id=str(data["arm_id"]),
owner_user_id=(
str(data["owner_user_id"])
if data.get("owner_user_id") is not None
else None
),
base=Transform.from_dict(data["base"]),
tip=Transform.from_dict(data["tip"]),
centerline=[list(point) for point in data["centerline"]],
radii=list(data["radii"]),
element_lengths=list(data.get("element_lengths", [])),
directors=[
[list(row) for row in director]
for director in data.get("directors", [])
],
contact_points=[
list(point) for point in data.get("contact_points", [])
],
)
[docs]
@dataclass(slots=True)
class MeshEntity:
"""Mesh instance published alongside the arm state.
Mesh entities are used for generated scenery, obstacles, props, or other
user-owned assets that should be rendered by the client.
Attributes
----------
mesh_id
Stable mesh identifier.
owner_id
User that owns the mesh entity.
asset_uri
URI pointing to the mesh asset, often a GLTF data URI.
translation
World-space translation.
rotation_xyzw
World-space quaternion rotation in ``[x, y, z, w]`` order.
scale
Non-uniform scale vector.
visible
Whether the client should render the mesh.
static_asset
If true, the server may omit ``asset_uri`` on subsequent ``scene_state``
payloads once each client has received the full mesh once (large scenery).
"""
mesh_id: str
owner_id: str
asset_uri: str
translation: list[float] = field(default_factory=lambda: [0.0, 0.0, 0.0])
rotation_xyzw: list[float] = field(
default_factory=lambda: [0.0, 0.0, 0.0, 1.0]
)
scale: list[float] = field(default_factory=lambda: [1.0, 1.0, 1.0])
visible: bool = True
static_asset: bool = False
def __post_init__(self) -> None:
if not self.mesh_id:
raise ValueError("mesh_id cannot be empty")
if not self.owner_id:
raise ValueError("owner_id cannot be empty")
if not self.asset_uri and not self.static_asset:
raise ValueError(
"asset_uri cannot be empty unless static_asset is true"
)
_validate_vector_shape(self.translation, 3, "translation")
_validate_vector_shape(self.rotation_xyzw, 4, "rotation_xyzw")
_validate_vector_shape(self.scale, 3, "scale")
[docs]
def to_client_dict(self, *, include_asset_uri: bool = True) -> JSONDict:
"""Serialize for WebSocket payloads.
Parameters
----------
include_asset_uri
When false, ``asset_uri`` is omitted so static scenery is not resent.
"""
payload: JSONDict = {
"mesh_id": self.mesh_id,
"owner_id": self.owner_id,
"translation": self.translation,
"rotation_xyzw": self.rotation_xyzw,
"scale": self.scale,
"visible": self.visible,
"static_asset": self.static_asset,
}
if include_asset_uri:
payload["asset_uri"] = self.asset_uri
return payload
[docs]
def to_dict(self) -> JSONDict:
"""Serialize to a JSON-compatible dictionary."""
return self.to_client_dict(include_asset_uri=True)
[docs]
@classmethod
def from_dict(cls, data: JSONDict) -> "MeshEntity":
"""Deserialize from JSON-compatible data."""
static_asset = bool(data.get("static_asset", False))
return cls(
mesh_id=str(data["mesh_id"]),
owner_id=str(data["owner_id"]),
asset_uri=str(data.get("asset_uri", "")),
translation=list(data.get("translation", [0.0, 0.0, 0.0])),
rotation_xyzw=list(data.get("rotation_xyzw", [0.0, 0.0, 0.0, 1.0])),
scale=list(data.get("scale", [1.0, 1.0, 1.0])),
visible=bool(data.get("visible", True)),
static_asset=static_asset,
)
[docs]
@dataclass(slots=True)
class OverlayPointsEntity:
"""Point-cloud style overlay payload for lightweight debug visuals.
This entity type is useful for contact-point trails or other transient point
markers that do not warrant a mesh.
Attributes
----------
overlay_id
Stable overlay identifier.
owner_id
User that owns the overlay.
points
World-space points to render.
point_size
Rendered point radius/size in world units.
visible
Whether the client should render the overlay.
"""
overlay_id: str
owner_id: str
points: list[list[float]] = field(default_factory=list)
point_size: float = 0.008
visible: bool = True
def __post_init__(self) -> None:
if not self.overlay_id:
raise ValueError("overlay_id cannot be empty")
if not self.owner_id:
raise ValueError("owner_id cannot be empty")
if self.point_size <= 0.0:
raise ValueError("point_size must be > 0")
for idx, point in enumerate(self.points):
_validate_vector_shape(point, 3, f"points[{idx}]")
[docs]
def to_dict(self) -> JSONDict:
"""Serialize to a JSON-compatible dictionary."""
return {
"overlay_id": self.overlay_id,
"owner_id": self.owner_id,
"points": self.points,
"point_size": self.point_size,
"visible": self.visible,
}
[docs]
@classmethod
def from_dict(cls, data: JSONDict) -> "OverlayPointsEntity":
"""Deserialize from JSON-compatible data."""
return cls(
overlay_id=str(data["overlay_id"]),
owner_id=str(data["owner_id"]),
points=[list(point) for point in data.get("points", [])],
point_size=float(data.get("point_size", 0.008)),
visible=bool(data.get("visible", True)),
)
[docs]
@dataclass(slots=True)
class SphereEntity:
"""Simple sphere primitive published as part of the scene state.
This is used for dynamic spheres such as the Cathy-throw target objects.
Attributes
----------
sphere_id
Stable sphere identifier.
owner_id
User that owns the sphere.
translation
World-space sphere center.
radius
Sphere radius in world units.
color_rgb
Display color as ``[r, g, b]`` values.
visible
Whether the client should render the sphere.
"""
sphere_id: str
owner_id: str
translation: list[float]
radius: float
color_rgb: list[float] = field(default_factory=lambda: [0.95, 0.45, 0.08])
visible: bool = True
def __post_init__(self) -> None:
if not self.sphere_id:
raise ValueError("sphere_id cannot be empty")
if not self.owner_id:
raise ValueError("owner_id cannot be empty")
_validate_vector_shape(self.translation, 3, "translation")
_validate_vector_shape(self.color_rgb, 3, "color_rgb")
if self.radius <= 0.0:
raise ValueError("radius must be > 0")
[docs]
def to_dict(self) -> JSONDict:
"""Serialize to a JSON-compatible dictionary."""
return {
"sphere_id": self.sphere_id,
"owner_id": self.owner_id,
"translation": self.translation,
"radius": self.radius,
"color_rgb": self.color_rgb,
"visible": self.visible,
}
[docs]
@classmethod
def from_dict(cls, data: JSONDict) -> "SphereEntity":
"""Deserialize from JSON-compatible data."""
return cls(
sphere_id=str(data["sphere_id"]),
owner_id=str(data["owner_id"]),
translation=list(data.get("translation", [0.0, 0.0, 0.0])),
radius=float(data["radius"]),
color_rgb=list(data.get("color_rgb", [0.95, 0.45, 0.08])),
visible=bool(data.get("visible", True)),
)
@dataclass(slots=True)
class HapticEvent:
arm_id: str
active: bool
intensity: float
def __post_init__(self) -> None:
self.active = bool(self.active)
self.intensity = float(self.intensity)
if self.intensity < 0.0:
raise ValueError("intensity must be >= 0")
def to_dict(self) -> JSONDict:
return {
"arm_id": self.arm_id,
"active": bool(self.active),
"intensity": float(self.intensity),
}
@classmethod
def from_dict(cls, data: JSONDict) -> "HapticEvent":
return cls(
arm_id=str(data["arm_id"]),
active=bool(data.get("active", False)),
intensity=float(data.get("intensity", 0.0)),
)
[docs]
@dataclass(slots=True)
class SceneState:
"""Full scene snapshot published from Python to the WebXR client.
This payload combines arm state with optional scenery and user-owned assets
and is the core update object described in the communication and asset
publishing guides.
Attributes
----------
timestamp
Scene time in seconds.
arms
Mapping from ``arm_id`` to :class:`ArmState`.
scenery
Named world transforms for extra fixed scene anchors.
user_arms
Mapping from ``user_id`` to the arm ids currently owned by that user.
meshes
Mesh instances keyed by ``mesh_id``.
overlay_points
Point overlays keyed by ``overlay_id``.
spheres
Sphere primitives keyed by ``sphere_id``.
"""
timestamp: float
arms: dict[str, ArmState]
scenery: dict[str, Transform] = field(default_factory=dict)
user_arms: dict[str, list[str]] = field(default_factory=dict)
meshes: dict[str, MeshEntity] = field(default_factory=dict)
overlay_points: dict[str, OverlayPointsEntity] = field(default_factory=dict)
spheres: dict[str, SphereEntity] = field(default_factory=dict)
haptics: list[HapticEvent] = field(default_factory=list)
def __post_init__(self) -> None:
for key, arm in self.arms.items():
if key != arm.arm_id:
raise ValueError("arms keys must match ArmState.arm_id")
[docs]
def to_dict(self) -> JSONDict:
"""Serialize to a JSON-compatible dictionary."""
return {
"timestamp": self.timestamp,
"arms": {
arm_id: arm.to_dict() for arm_id, arm in self.arms.items()
},
"scenery": {
name: transform.to_dict()
for name, transform in self.scenery.items()
},
"user_arms": self.user_arms,
"meshes": {
mesh_id: mesh.to_dict() for mesh_id, mesh in self.meshes.items()
},
"overlay_points": {
overlay_id: overlay.to_dict()
for overlay_id, overlay in self.overlay_points.items()
},
"spheres": {
sphere_id: sphere.to_dict()
for sphere_id, sphere in self.spheres.items()
},
"haptics": [event.to_dict() for event in self.haptics],
}
[docs]
def to_dict_for_client(
self, sent_static_mesh_asset_ids: set[str]
) -> JSONDict:
"""Serialize like :meth:`to_dict`, but omit repeated ``asset_uri`` for static meshes.
Updates ``sent_static_mesh_asset_ids`` in place: after a full send of a static
mesh, its ``mesh_id`` is added; ids are dropped when the mesh disappears.
"""
sent_static_mesh_asset_ids.intersection_update(self.meshes.keys())
meshes_out: dict[str, JSONDict] = {}
for mesh_id, mesh in self.meshes.items():
if mesh.static_asset and mesh_id in sent_static_mesh_asset_ids:
meshes_out[mesh_id] = mesh.to_client_dict(
include_asset_uri=False
)
else:
meshes_out[mesh_id] = mesh.to_client_dict(
include_asset_uri=True
)
if mesh.static_asset:
sent_static_mesh_asset_ids.add(mesh_id)
return {
"timestamp": self.timestamp,
"arms": {
arm_id: arm.to_dict() for arm_id, arm in self.arms.items()
},
"scenery": {
name: transform.to_dict()
for name, transform in self.scenery.items()
},
"user_arms": self.user_arms,
"meshes": meshes_out,
"overlay_points": {
overlay_id: overlay.to_dict()
for overlay_id, overlay in self.overlay_points.items()
},
"spheres": {
sphere_id: sphere.to_dict()
for sphere_id, sphere in self.spheres.items()
},
"haptics": [event.to_dict() for event in self.haptics],
}
[docs]
@classmethod
def from_dict(cls, data: JSONDict) -> "SceneState":
"""Deserialize from JSON-compatible data."""
return cls(
timestamp=float(data["timestamp"]),
arms={
arm_id: ArmState.from_dict(arm_data)
for arm_id, arm_data in data["arms"].items()
},
scenery={
name: Transform.from_dict(transform)
for name, transform in data.get("scenery", {}).items()
},
user_arms={
user_id: list(arm_ids)
for user_id, arm_ids in data.get("user_arms", {}).items()
},
meshes={
mesh_id: MeshEntity.from_dict(mesh_data)
for mesh_id, mesh_data in data.get("meshes", {}).items()
},
overlay_points={
overlay_id: OverlayPointsEntity.from_dict(overlay_data)
for overlay_id, overlay_data in data.get(
"overlay_points", {}
).items()
},
spheres={
sphere_id: SphereEntity.from_dict(sphere_data)
for sphere_id, sphere_data in data.get("spheres", {}).items()
},
haptics=[
HapticEvent.from_dict(event)
for event in data.get("haptics", [])
],
)