import os
from typing import Any, Optional, Union
import numpy as np
import sapien
import torch
from transforms3d.euler import euler2quat
from mani_skill.agents.base_agent import BaseAgent, Keyframe
from mani_skill.agents.controllers import *
from mani_skill.envs.sapien_env import BaseEnv
from mani_skill.envs.utils import randomization, rewards
from mani_skill.sensors.camera import CameraConfig
from mani_skill.utils import common, sapien_utils
from mani_skill.utils.building.ground import build_ground
from mani_skill.utils.geometry import rotation_conversions
from mani_skill.utils.registration import register_env
from mani_skill.utils.structs.pose import Pose
from mani_skill.utils.structs.types import Array, SceneConfig, SimConfig
[docs]MJCF_FILE = f"{os.path.join(os.path.dirname(__file__), 'assets/ant.xml')}"
[docs]class AntRobot(BaseAgent):
[docs] keyframes = dict(
stand=Keyframe(
qpos=np.array([0, 0, 0, 0, 1, -1, -1, 1]),
pose=sapien.Pose(p=[0, 0, -0.175], q=euler2quat(0, 0, np.pi / 2)),
)
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@property
[docs] def _controller_configs(self):
body = PDJointPosControllerConfig(
[joint.name for joint in self.robot.get_active_joints()],
lower=-1,
upper=1,
damping=1e2,
stiffness=1e3,
use_delta=True,
)
return deepcopy_dict(
dict(
pd_joint_delta_pos=dict(
body=body,
balance_passive_force=False,
),
)
)
[docs] def _load_articulation(
self, initial_pose: Optional[Union[sapien.Pose, Pose]] = None
):
"""
Load the robot articulation
"""
loader = self.scene.create_mjcf_loader()
asset_path = str(self.mjcf_path)
loader.name = self.uid
builder = loader.parse(asset_path)["articulation_builders"][0]
builder.initial_pose = initial_pose
self.robot = builder.build()
assert self.robot is not None, f"Fail to load URDF/MJCF from {asset_path}"
self.robot_link_ids = [link.name for link in self.robot.get_links()]
[docs]class AntEnv(BaseEnv):
[docs] SUPPORTED_REWARD_MODES = ("normalized_dense", "dense", "none")
def __init__(self, *args, robot_uids=AntRobot, move_speed=0, **kwargs):
super().__init__(*args, robot_uids=robot_uids, **kwargs)
[docs] self.move_speed = move_speed
@property
[docs] def _default_sim_config(self):
return SimConfig(
scene_config=SceneConfig(
solver_position_iterations=4, solver_velocity_iterations=1
),
spacing=20,
sim_freq=200,
control_freq=40,
)
@property
[docs] def _default_sensor_configs(self):
return [
CameraConfig(
uid="side_cam",
pose=sapien_utils.look_at(eye=[0.5, -2, 1], target=[0, 0, 0]),
width=128,
height=128,
fov=60 * np.pi / 180,
near=0.01,
far=100,
mount=self.camera_mount,
),
]
@property
[docs] def _default_human_render_camera_configs(self):
return [
CameraConfig(
uid="training_side_vis",
pose=sapien_utils.look_at(eye=[0.5, -2, 1], target=[0, 0, 0]),
width=512,
height=512,
fov=60 * np.pi / 180,
near=0.01,
far=100,
mount=self.camera_mount,
),
]
[docs] def _load_scene(self, options: dict):
loader = self.scene.create_mjcf_loader()
actor_builders = loader.parse(self.agent.mjcf_path)["actor_builders"]
for a in actor_builders:
a.build(a.name)
self.ground = build_ground(self.scene, floor_width=500)
# allow tracking of ant
self.camera_mount = self.scene.create_actor_builder().build_kinematic(
"camera_mount"
)
# cache for com velocity calc
self.active_links = [
link for link in self.agent.robot.get_links() if "dummy" not in link.name
]
self.robot_links_mass = torch.stack(
[link.mass[0] for link in self.active_links]
).to(self.device)
self.robot_mass = torch.sum(self.robot_links_mass).item()
self.force_sensor_links = [
link.name for link in self.active_links if "foot" in link.name
]
[docs] def _initialize_episode(self, env_idx: torch.Tensor, options: dict):
with torch.device(self.device):
b = len(env_idx)
# set agent root pose - torso now centered at dummy root at (0,0,0)
self.agent.robot.set_root_pose(self.agent.keyframes["stand"].pose)
# set randomized qpos
base_qpos = (
torch.tensor(self.agent.keyframes["stand"].qpos, dtype=torch.float32)
.unsqueeze(0)
.repeat(b, 1)
)
noise_scale = 1e-2
qpos_noise = (
torch.rand(b, self.agent.robot.dof[0]) * (2 * noise_scale)
) - noise_scale
qvel_noise = (
torch.rand(b, self.agent.robot.dof[0]) * (2 * noise_scale)
) - noise_scale
self.agent.robot.set_qpos(base_qpos + qpos_noise)
self.agent.robot.set_qvel(qvel_noise)
# set the camera to begin tracking the agent
self.camera_mount.set_pose(
Pose.create_from_pq(p=self.agent.robot.links_map["torso"].pose.p)
)
# reset the camera mount every timstep
# necessary because we want camera to follow torso pos only, not orientation
[docs] def _after_control_step(self):
self.camera_mount.set_pose(
Pose.create_from_pq(p=self.agent.robot.links_map["torso"].pose.p)
)
# gpu requires that we manually apply this update
if self.gpu_sim_enabled:
# we update just actor pose here, no need to call apply_all/fetch_all
self.scene.px.gpu_apply_rigid_dynamic_data()
self.scene.px.gpu_fetch_rigid_dynamic_data()
@property
[docs] def get_vels(self):
"""Returns linvel and angvel of each link and cmass linvel of robot"""
angvels = torch.stack(
[link.get_angular_velocity() for link in self.active_links], dim=1
) # (num_links, b, 3)
linvels = torch.stack(
[link.get_linear_velocity() for link in self.active_links], dim=1
) # (num_links, b, 3)
batch_angvels = angvels.view(-1, len(self.active_links) * 3)
batch_linvels = linvels.view(-1, len(self.active_links) * 3)
com_vel = linvels * self.robot_links_mass.view(1, -1, 1)
com_vel = com_vel.sum(dim=1) / self.robot_mass # (b, 3)
return batch_angvels, batch_linvels, com_vel
@property
[docs] def torso_height(self):
return self.agent.robot.links_map["torso"].pose.raw_pose[:, -1]
@property
@property
[docs] def link_orientations(self):
return torch.stack([link.pose.q for link in self.active_links], dim=-1).view(
-1, len(self.active_links) * 4
)
# cache re-used computation
[docs] def evaluate(self) -> dict:
link_angvels, link_linvels, cmass_linvel = self.get_vels
return dict(
link_angvels=link_angvels,
link_linvels=link_linvels,
cmass_linvel=cmass_linvel,
)
[docs] def move_x_rew(self, info, move_speed=_WALK_SPEED):
com_vel_x = info["cmass_linvel"][:, 0]
return rewards.tolerance(
com_vel_x,
lower=move_speed,
upper=np.inf,
margin=move_speed,
value_at_margin=0,
sigmoid="linear",
).view(-1)
[docs] def standing_rew(self):
return rewards.tolerance(
self.torso_height,
lower=_STAND_HEIGHT,
upper=float("inf"),
margin=_STAND_HEIGHT / 4,
).view(-1)
[docs] def control_rew(self, action: Array):
return (
rewards.tolerance(action, margin=1, value_at_margin=0, sigmoid="quadratic")
.mean(dim=-1)
.view(-1)
)
[docs] def compute_dense_reward(self, obs: Any, action: torch.Tensor, info: dict):
small_control = (4 + self.control_rew(action)) / 5
return (
small_control * self.move_x_rew(info, self.move_speed) * self.standing_rew()
)
[docs] def compute_normalized_dense_reward(
self, obs: Any, action: torch.Tensor, info: dict
):
return self.compute_dense_reward(obs, action, info)
@register_env("MS-AntWalk-v1", max_episode_steps=1000)
[docs]class AntWalk(AntEnv):
"""
**Task Description:**
Ant moves in x direction at 0.5 m/s
**Randomizations:**
- Ant qpos and qvel have added noise from uniform distribution [-1e-2, 1e-2]
**Success Conditions:**
- No specific success conditions.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, robot_uids=AntRobot, move_speed=_WALK_SPEED, **kwargs)
@register_env("MS-AntRun-v1", max_episode_steps=1000)
[docs]class AntRun(AntEnv):
"""
**Task Description:**
Ant moves in x direction at 4 m/s
**Randomizations:**
- Ant qpos and qvel have added noise from uniform distribution [-1e-2, 1e-2]
**Success Conditions:**
- No specific success conditions.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, robot_uids=AntRobot, move_speed=_RUN_SPEED, **kwargs)