Source code for mani_skill.envs.tasks.tabletop.pick_clutter_ycb

import os
from typing import Union

import numpy as np
import sapien
import torch

from mani_skill import ASSET_DIR
from mani_skill.agents.robots import Fetch, Panda
from mani_skill.envs.sapien_env import BaseEnv
from mani_skill.sensors.camera import CameraConfig
from mani_skill.utils import sapien_utils
from mani_skill.utils.building import actors
from mani_skill.utils.building.actor_builder import ActorBuilder
from mani_skill.utils.io_utils import load_json
from mani_skill.utils.registration import register_env
from mani_skill.utils.scene_builder.table import TableSceneBuilder
from mani_skill.utils.structs import Actor, Pose
from mani_skill.utils.structs.types import GPUMemoryConfig, SimConfig


#
[docs]class PickClutterEnv(BaseEnv): """Base environment picking items out of clutter type of tasks. Flexibly supports using different configurations and object datasets"""
[docs] SUPPORTED_REWARD_MODES = ["none"]
[docs] SUPPORTED_ROBOTS = ["panda", "fetch"]
[docs] agent: Union[Panda, Fetch]
[docs] DEFAULT_EPISODE_JSON: str
[docs] DEFAULT_ASSET_ROOT: str
[docs] DEFAULT_MODEL_JSON: str
def __init__( self, *args, robot_uids="panda", robot_init_qpos_noise=0.02, num_envs=1, reconfiguration_freq=None, episode_json: str = None, **kwargs, ):
[docs] self.robot_init_qpos_noise = robot_init_qpos_noise
if episode_json is None: episode_json = self.DEFAULT_EPISODE_JSON if not os.path.exists(episode_json): raise FileNotFoundError( f"Episode json ({episode_json}) is not found." "To download default json:" "`python -m mani_skill.utils.download_asset pick_clutter_ycb`." )
[docs] self._episodes: list[dict] = load_json(episode_json)
if reconfiguration_freq is None: if num_envs == 1: reconfiguration_freq = 1 else: reconfiguration_freq = 0 super().__init__( *args, robot_uids=robot_uids, num_envs=num_envs, reconfiguration_freq=reconfiguration_freq, **kwargs, ) @property
[docs] def _default_sim_config(self): return SimConfig( gpu_memory_config=GPUMemoryConfig( max_rigid_contact_count=2**21, max_rigid_patch_count=2**19 ) )
@property
[docs] def _default_sensor_configs(self): pose = sapien_utils.look_at(eye=[0.3, 0, 0.6], target=[-0.1, 0, 0.1]) return [ CameraConfig( "base_camera", pose=pose, width=128, height=128, fov=np.pi / 2, near=0.01, far=100, ) ]
@property
[docs] def _default_human_render_camera_configs(self): pose = sapien_utils.look_at([0.6, 0.7, 0.6], [0.0, 0.0, 0.35]) return CameraConfig( "render_camera", pose=pose, width=512, height=512, fov=1, near=0.01, far=100 )
[docs] def _load_model(self, model_id: str) -> ActorBuilder: raise NotImplementedError()
[docs] def _load_agent(self, options: dict): super()._load_agent(options, sapien.Pose(p=[-0.615, 0, 0]))
[docs] def _load_scene(self, options: dict): self.scene_builder = TableSceneBuilder( self, robot_init_qpos_noise=self.robot_init_qpos_noise ) self.scene_builder.build() # sample some clutter configurations eps_idxs = self._batched_episode_rng.randint(0, len(self._episodes)) self.selectable_target_objects: list[list[Actor]] = [] """for each sub-scene, a list of objects that can be selected as targets""" all_objects = [] for i, eps_idx in enumerate(eps_idxs): self.selectable_target_objects.append([]) episode = self._episodes[eps_idx] for actor_config in episode["actors"]: builder = self._load_model(actor_config["model_id"]) init_pose = actor_config["pose"] builder.initial_pose = sapien.Pose(p=init_pose[:3], q=init_pose[3:]) builder.set_scene_idxs([i]) obj = builder.build(name=f"set_{i}_{actor_config['model_id']}") all_objects.append(obj) if actor_config["rep_pts"] is not None: # rep_pts is representative points, representing visible points # we only permit selecting target objects that are visible self.selectable_target_objects[-1].append(obj) self.all_objects = Actor.merge(all_objects, name="all_objects") self.goal_site = actors.build_sphere( self.scene, radius=0.01, color=[0, 1, 0, 1], name="goal_site", body_type="kinematic", add_collision=False, initial_pose=sapien.Pose(), ) self._hidden_objects.append(self.goal_site) self._sample_target_objects()
[docs] def _sample_target_objects(self): # note this samples new target objects for every sub-scene target_objects = [] for i in range(self.num_envs): selected_obj_idxs = torch.randint(low=0, high=99999, size=(self.num_envs,)) selected_obj_idxs[i] = selected_obj_idxs[i] % len( self.selectable_target_objects[-1] ) target_objects.append( self.selectable_target_objects[-1][selected_obj_idxs[i]] ) self.target_object = Actor.merge(target_objects, name="target_object")
[docs] def _initialize_episode(self, env_idx: torch.Tensor, options: dict): with torch.device(self.device): b = len(env_idx) self.scene_builder.initialize(env_idx) goal_pos = torch.rand(size=(b, 3)) * torch.tensor( [0.3, 0.5, 0.1] ) + torch.tensor([-0.15, -0.25, 0.35]) self.goal_pos = goal_pos self.goal_site.set_pose(Pose.create_from_pq(self.goal_pos)) # reset objects to original poses if b == self.num_envs: # if all envs reset self.all_objects.pose = self.all_objects.initial_pose else: # if only some envs reset, we unfortunately still have to do some mask wrangling mask = torch.isin(self.all_objects._scene_idxs, env_idx) self.all_objects.pose = self.all_objects.initial_pose[mask]
[docs] def evaluate(self): return { "success": torch.zeros(self.num_envs, device=self.device, dtype=bool), "fail": torch.zeros(self.num_envs, device=self.device, dtype=bool), }
[docs] def _get_obs_extra(self, info: dict): return dict()
@register_env( "PickClutterYCB-v1", asset_download_ids=["ycb", "pick_clutter_ycb_configs"], max_episode_steps=100, )
[docs]class PickClutterYCBEnv(PickClutterEnv):
[docs] DEFAULT_EPISODE_JSON = f"{ASSET_DIR}/tasks/pick_clutter/ycb_train_5k.json.gz"
[docs] def _load_model(self, model_id): builder = actors.get_actor_builder(self.scene, id=f"ycb:{model_id}") return builder