Source code for mani_skill.envs.tasks.tabletop.pick_single_ycb

from typing import Any, Union

import numpy as np
import sapien
import torch

from mani_skill import ASSET_DIR
from mani_skill.agents.robots.fetch.fetch import Fetch
from mani_skill.agents.robots.panda.panda import Panda
from mani_skill.agents.robots.panda.panda_wristcam import PandaWristCam
from mani_skill.envs.sapien_env import BaseEnv
from mani_skill.envs.utils.randomization.pose import random_quaternions
from mani_skill.sensors.camera import CameraConfig
from mani_skill.utils import common, sapien_utils
from mani_skill.utils.building import actors
from mani_skill.utils.io_utils import load_json
from mani_skill.utils.registration import register_env
from mani_skill.utils.scene_builder.table import TableSceneBuilder
from mani_skill.utils.structs.actor import Actor
from mani_skill.utils.structs.pose import Pose
from mani_skill.utils.structs.types import GPUMemoryConfig, SimConfig

[docs]WARNED_ONCE = False
@register_env("PickSingleYCB-v1", max_episode_steps=50, asset_download_ids=["ycb"])
[docs]class PickSingleYCBEnv(BaseEnv): """ **Task Description:** Pick up a random object sampled from the [YCB dataset](https://www.ycbbenchmarks.com/) and move it to a random goal position **Randomizations:** - the object's xy position is randomized on top of a table in the region [0.1, 0.1] x [-0.1, -0.1]. It is placed flat on the table - the object's z-axis rotation is randomized - the object geometry is randomized by randomly sampling any YCB object. (during reconfiguration) **Success Conditions:** - the object position is within goal_thresh (default 0.025) euclidean distance of the goal position - the robot is static (q velocity < 0.2) **Goal Specification:** - 3D goal position (also visualized in human renders) **Additional Notes** - On GPU simulation, in order to collect data from every possible object in the YCB database we recommend using at least 128 parallel environments or more, otherwise you will need to reconfigure in order to sample new objects. """
[docs] SUPPORTED_ROBOTS = ["panda", "panda_wristcam", "fetch"]
[docs] agent: Union[Panda, PandaWristCam, Fetch]
[docs] goal_thresh = 0.025
def __init__( self, *args, robot_uids="panda_wristcam", robot_init_qpos_noise=0.02, num_envs=1, reconfiguration_freq=None, **kwargs, ):
[docs] self.robot_init_qpos_noise = robot_init_qpos_noise
[docs] self.model_id = None
[docs] self.all_model_ids = np.array( [ k for k in load_json( ASSET_DIR / "assets/mani_skill2_ycb/info_pick_v0.json" ).keys() if k not in [ "022_windex_bottle", "028_skillet_lid", "029_plate", "059_chain", ] # NOTE (arth): ignore these non-graspable/hard to grasp ycb objects ] )
if reconfiguration_freq is None: if num_envs == 1: reconfiguration_freq = 1 else: reconfiguration_freq = 0 super().__init__( *args, robot_uids=robot_uids, reconfiguration_freq=reconfiguration_freq, num_envs=num_envs, **kwargs, ) @property
[docs] def _default_sim_config(self): return SimConfig( gpu_memory_config=GPUMemoryConfig( max_rigid_contact_count=2**20, max_rigid_patch_count=2**19 ) )
@property
[docs] def _default_sensor_configs(self): pose = sapien_utils.look_at(eye=[0.3, 0, 0.6], target=[-0.1, 0, 0.1]) return [CameraConfig("base_camera", pose, 128, 128, np.pi / 2, 0.01, 100)]
@property
[docs] def _default_human_render_camera_configs(self): pose = sapien_utils.look_at([0.6, 0.7, 0.6], [0.0, 0.0, 0.35]) return CameraConfig("render_camera", pose, 512, 512, 1, 0.01, 100)
[docs] def _load_agent(self, options: dict): super()._load_agent(options, sapien.Pose(p=[-0.615, 0, 0]))
[docs] def _load_scene(self, options: dict): global WARNED_ONCE self.table_scene = TableSceneBuilder( env=self, robot_init_qpos_noise=self.robot_init_qpos_noise ) self.table_scene.build() # randomize the list of all possible models in the YCB dataset # then sub-scene i will load model model_ids[i % number_of_ycb_objects] model_ids = self._batched_episode_rng.choice(self.all_model_ids, replace=True) if ( self.num_envs > 1 and self.num_envs < len(self.all_model_ids) and self.reconfiguration_freq <= 0 and not WARNED_ONCE ): WARNED_ONCE = True print( """There are less parallel environments than total available models to sample. Not all models will be used during interaction even after resets unless you call env.reset(options=dict(reconfigure=True)) or set reconfiguration_freq to be >= 1.""" ) self._objs: list[Actor] = [] self.obj_heights = [] for i, model_id in enumerate(model_ids): # TODO: before official release we will finalize a metadata dataclass that these build functions should return. builder = actors.get_actor_builder( self.scene, id=f"ycb:{model_id}", ) builder.initial_pose = sapien.Pose(p=[0, 0, 0]) builder.set_scene_idxs([i]) self._objs.append(builder.build(name=f"{model_id}-{i}")) self.remove_from_state_dict_registry(self._objs[-1]) self.obj = Actor.merge(self._objs, name="ycb_object") self.add_to_state_dict_registry(self.obj) self.goal_site = actors.build_sphere( self.scene, radius=self.goal_thresh, color=[0, 1, 0, 1], name="goal_site", body_type="kinematic", add_collision=False, initial_pose=sapien.Pose(), ) self._hidden_objects.append(self.goal_site)
[docs] def _after_reconfigure(self, options: dict): self.object_zs = [] for obj in self._objs: collision_mesh = obj.get_first_collision_mesh() # this value is used to set object pose so the bottom is at z=0 self.object_zs.append(-collision_mesh.bounding_box.bounds[0, 2]) self.object_zs = common.to_tensor(self.object_zs, device=self.device)
[docs] def _initialize_episode(self, env_idx: torch.Tensor, options: dict): with torch.device(self.device): b = len(env_idx) self.table_scene.initialize(env_idx) xyz = torch.zeros((b, 3)) xyz[:, :2] = torch.rand((b, 2)) * 0.2 - 0.1 xyz[:, 2] = self.object_zs[env_idx] qs = random_quaternions(b, lock_x=True, lock_y=True) self.obj.set_pose(Pose.create_from_pq(p=xyz, q=qs)) goal_xyz = torch.zeros((b, 3)) goal_xyz[:, :2] = torch.rand((b, 2)) * 0.2 - 0.1 goal_xyz[:, 2] = torch.rand((b)) * 0.3 + xyz[:, 2] self.goal_site.set_pose(Pose.create_from_pq(goal_xyz)) # Initialize robot arm to a higher position above the table than the default typically used for other table top tasks if self.robot_uids == "panda" or self.robot_uids == "panda_wristcam": # fmt: off qpos = np.array( [0.0, 0, 0, -np.pi * 2 / 3, 0, np.pi * 2 / 3, np.pi / 4, 0.04, 0.04] ) # fmt: on qpos[:-2] += self._episode_rng.normal( 0, self.robot_init_qpos_noise, len(qpos) - 2 ) self.agent.reset(qpos) self.agent.robot.set_root_pose(sapien.Pose([-0.615, 0, 0])) else: raise NotImplementedError(self.robot_uids)
[docs] def evaluate(self): obj_to_goal_pos = self.goal_site.pose.p - self.obj.pose.p is_obj_placed = torch.linalg.norm(obj_to_goal_pos, axis=1) <= self.goal_thresh is_grasped = self.agent.is_grasping(self.obj) is_robot_static = self.agent.is_static(0.2) return dict( is_grasped=is_grasped, obj_to_goal_pos=obj_to_goal_pos, is_obj_placed=is_obj_placed, is_robot_static=is_robot_static, is_grasping=self.agent.is_grasping(self.obj), success=torch.logical_and(is_obj_placed, is_robot_static), )
[docs] def _get_obs_extra(self, info: dict): obs = dict( tcp_pose=self.agent.tcp.pose.raw_pose, goal_pos=self.goal_site.pose.p, is_grasped=info["is_grasped"], ) if "state" in self.obs_mode: obs.update( tcp_to_goal_pos=self.goal_site.pose.p - self.agent.tcp.pose.p, obj_pose=self.obj.pose.raw_pose, tcp_to_obj_pos=self.obj.pose.p - self.agent.tcp.pose.p, obj_to_goal_pos=self.goal_site.pose.p - self.obj.pose.p, ) return obs
[docs] def compute_dense_reward(self, obs: Any, action: torch.Tensor, info: dict): tcp_to_obj_dist = torch.linalg.norm( self.obj.pose.p - self.agent.tcp.pose.p, axis=1 ) reaching_reward = 1 - torch.tanh(5 * tcp_to_obj_dist) reward = reaching_reward is_grasped = info["is_grasped"] reward += is_grasped obj_to_goal_dist = torch.linalg.norm( self.goal_site.pose.p - self.obj.pose.p, axis=1 ) place_reward = 1 - torch.tanh(5 * obj_to_goal_dist) reward += place_reward * is_grasped reward += info["is_obj_placed"] * is_grasped static_reward = 1 - torch.tanh( 5 * torch.linalg.norm(self.agent.robot.get_qvel()[..., :-2], axis=1) ) reward += static_reward * info["is_obj_placed"] * is_grasped reward[info["success"]] = 6 return reward
[docs] def compute_normalized_dense_reward( self, obs: Any, action: torch.Tensor, info: dict ): return self.compute_dense_reward(obs=obs, action=action, info=info) / 6