Source code for mani_skill.envs.utils.observations.observations

"""
Functions that map a observation to a particular format, e.g. mapping the raw images to rgbd or pointcloud formats
"""


import numpy as np
import sapien.physx as physx
import torch

from mani_skill.render import SAPIEN_RENDER_SYSTEM
from mani_skill.sensors.base_sensor import BaseSensor, BaseSensorConfig
from mani_skill.sensors.camera import Camera
from mani_skill.utils import common


[docs]def sensor_data_to_pointcloud(observation: dict, sensors: dict[str, BaseSensor]): """convert all camera data in sensor to pointcloud data""" sensor_data = observation["sensor_data"] camera_params = observation["sensor_param"] pointcloud_obs = dict() for (cam_uid, images), (sensor_uid, sensor) in zip( sensor_data.items(), sensors.items() ): assert cam_uid == sensor_uid if isinstance(sensor, Camera): cam_pcd = {} # TODO: double check if the .clone()s are necessary # Each pixel is (x, y, z, actor_id) in OpenGL camera space # actor_id = 0 for the background images: dict[str, torch.Tensor] position = images["position"].clone() segmentation = images["segmentation"].clone() position = position.float() position[..., :3] = ( position[..., :3] / 1000.0 ) # convert the raw depth from millimeters to meters # Convert to world space cam2world = camera_params[cam_uid]["cam2world_gl"].to(position.device) xyzw = torch.cat([position, segmentation != 0], dim=-1).reshape( position.shape[0], -1, 4 ) @ cam2world.transpose(1, 2) cam_pcd["xyzw"] = xyzw # Extra keys if "rgb" in images: rgb = images["rgb"][..., :3].clone() cam_pcd["rgb"] = rgb.reshape(rgb.shape[0], -1, 3) if "segmentation" in images: cam_pcd["segmentation"] = segmentation.reshape( segmentation.shape[0], -1, 1 ) pointcloud_obs[cam_uid] = cam_pcd for k in pointcloud_obs.keys(): del observation["sensor_data"][k] pointcloud_obs = common.merge_dicts(pointcloud_obs.values()) for key, value in pointcloud_obs.items(): pointcloud_obs[key] = torch.concat(value, axis=1) observation["pointcloud"] = pointcloud_obs # if not physx.is_gpu_enabled(): # observation["pointcloud"]["segmentation"] = ( # observation["pointcloud"]["segmentation"].numpy().astype(np.uint16) # ) return observation