import copy
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Optional, Union
import gymnasium as gym
import h5py
import numpy as np
import sapien.physx as physx
import torch
from mani_skill import get_commit_info
from mani_skill.envs.sapien_env import BaseEnv
from mani_skill.utils import common, gym_utils, sapien_utils
from mani_skill.utils.io_utils import dump_json
from mani_skill.utils.logging_utils import logger
from mani_skill.utils.structs.types import Array
from mani_skill.utils.visualization.misc import (
images_to_video,
put_info_on_image,
tile_images,
)
from mani_skill.utils.wrappers import CPUGymWrapper
# NOTE (stao): The code for record.py is quite messy and perhaps confusing as it is trying to support both recording on CPU and GPU seamlessly
# and handle partial resets. It works but can be claned up a lot.
[docs]def parse_env_info(env: gym.Env):
# spec can be None if not initialized from gymnasium.make
env = env.unwrapped
if env.spec is None:
return None
if hasattr(env.spec, "_kwargs"):
# gym<=0.21
env_kwargs = env.spec._kwargs
else:
# gym>=0.22
env_kwargs = env.spec.kwargs
return dict(
env_id=env.spec.id,
env_kwargs=env_kwargs,
)
[docs]def temp_deep_print_shapes(x, prefix=""):
if isinstance(x, dict):
for k in x:
temp_deep_print_shapes(x[k], prefix=prefix + "/" + k)
else:
print(prefix, x.shape)
[docs]def clean_trajectories(h5_file: h5py.File, json_dict: dict, prune_empty_action=True):
"""Clean trajectories by renaming and pruning trajectories in place.
After cleanup, trajectory names are consecutive integers (traj_0, traj_1, ...),
and trajectories with empty action are pruned.
Args:
h5_file: raw h5 file
json_dict: raw JSON dict
prune_empty_action: whether to prune trajectories with empty action
"""
json_episodes = json_dict["episodes"]
assert len(h5_file) == len(json_episodes)
# Assumes each trajectory is named "traj_{i}"
prefix_length = len("traj_")
ep_ids = sorted([int(x[prefix_length:]) for x in h5_file.keys()])
new_json_episodes = []
new_ep_id = 0
for i, ep_id in enumerate(ep_ids):
traj_id = f"traj_{ep_id}"
ep = json_episodes[i]
assert ep["episode_id"] == ep_id
new_traj_id = f"traj_{new_ep_id}"
if prune_empty_action and ep["elapsed_steps"] == 0:
del h5_file[traj_id]
continue
if new_traj_id != traj_id:
ep["episode_id"] = new_ep_id
h5_file[new_traj_id] = h5_file[traj_id]
del h5_file[traj_id]
new_json_episodes.append(ep)
new_ep_id += 1
json_dict["episodes"] = new_json_episodes
@dataclass
[docs]class Step:
[docs] observation: np.ndarray
[docs] env_episode_ptr: np.ndarray
"""points to index in above data arrays where current episode started (any data before should already be flushed)"""
[docs] success: np.ndarray = None
[docs] fail: np.ndarray = None
[docs]class RecordEpisode(gym.Wrapper):
"""Record trajectories or videos for episodes. You generally should always apply this wrapper last, particularly if you include
observation wrappers which modify the returned observations. The only wrappers that may go after this one is any of the vector env
interface wrappers that map the maniskill env to a e.g. gym vector env interface.
Trajectory data is saved with two files, the actual data in a .h5 file via H5py and metadata in a JSON file of the same basename.
Each JSON file contains:
- `env_info` (dict): task (also known as environment) information, which can be used to initialize the task
- `env_id` (str): task id
- `max_episode_steps` (int)
- `env_kwargs` (dict): keyword arguments to initialize the task. **Essential to recreate the environment.**
- `episodes` (list[dict]): episode information
- `source_type` (Optional[str]): a simple category string describing what process generated the trajectory data. ManiSkill official datasets will usually write one of "human", "motionplanning", or "rl" at the moment.
- `source_desc` (Optional[str]): a longer explanation of how the data was generated.
The episode information (the element of `episodes`) includes:
- `episode_id` (int): a unique id to index the episode
- `reset_kwargs` (dict): keyword arguments to reset the task. **Essential to reproduce the trajectory.**
- `control_mode` (str): control mode used for the episode.
- `elapsed_steps` (int): trajectory length
- `info` (dict): information at the end of the episode.
With just the meta data, you can reproduce the task the same way it was created when the trajectories were collected as so:
```python
env = gym.make(env_info["env_id"], **env_info["env_kwargs"])
episode = env_info["episodes"][0] # picks the first
env.reset(**episode["reset_kwargs"])
```
Each HDF5 demonstration dataset consists of multiple trajectories. The key of each trajectory is `traj_{episode_id}`, e.g., `traj_0`.
Each trajectory is an `h5py.Group`, which contains:
- actions: [T, A], `np.float32`. `T` is the number of transitions.
- terminated: [T], `np.bool_`. It indicates whether the task is terminated or not at each time step.
- truncated: [T], `np.bool_`. It indicates whether the task is truncated or not at each time step.
- env_states: [T+1, D], `np.float32`. Environment states. It can be used to set the environment to a certain state via `env.set_state_dict`. However, it may not be enough to reproduce the trajectory.
- success (optional): [T], `np.bool_`. It indicates whether the task is successful at each time step. Included if task defines success.
- fail (optional): [T], `np.bool_`. It indicates whether the task is in a failure state at each time step. Included if task defines failure.
- obs (optional): [T+1, D] observations.
Note that env_states is in a dictionary form (and observations may be as well depending on obs_mode), where it is formatted as a dictionary of lists. For example, a typical environment state looks like this:
```python
env_state = env.get_state_dict()
\"\"\"
env_state = {
"actors": {
"actor_id": [...numpy_actor_state...],
...
},
"articulations": {
"articulation_id": [...numpy_articulation_state...],
...
}
}
\"\"\"
```
In the trajectory file env_states will be the same structure but each value/leaf in the dictionary will be a sequence of states representing the state of that particular entity in the simulation over time.
In practice it is may be more useful to use slices of the env_states data (or the observations data), which can be done with
```python
import mani_skill.trajectory.utils as trajectory_utils
env_states = trajectory_utils.dict_to_list_of_dicts(env_states)
# now env_states[i] is the same as the data env.get_state_dict() returned at timestep i
i = 10
env_state_i = trajectory_utils.index_dict(env_states, i)
# now env_state_i is the same as the data env.get_state_dict() returned at timestep i
```
Args:
env: the environment to record
output_dir: output directory
save_trajectory: whether to save trajectory
trajectory_name: name of trajectory file (.h5). Use timestamp if not provided.
save_video: whether to save video
info_on_video: whether to write data about reward, action, and data in the info object to the video. The first video frame is generally the result
of the first env.reset() (visualizing the first observation). Text is written on frames after that, showing the action taken to get to that
environment state and reward.
save_on_reset: whether to save the previous trajectory (and video of it if `save_video` is True) automatically when resetting.
Not that for environments simulated on the GPU (to leverage fast parallel rendering) you must
set `max_steps_per_video` to a fixed number so that every `max_steps_per_video` steps a video is saved. This is
required as there may be partial environment resets which makes it ambiguous about how to save/cut videos.
save_video_trigger: a function that takes the current number of elapsed environment steps and outputs a bool. If output is True, will start saving that timestep to the video.
max_steps_per_video: how many steps can be recorded into a single video before flushing the video. If None this is not used. A internal step counter is maintained to do this.
If the video is flushed at any point, the step counter is reset to 0.
clean_on_close: whether to rename and prune trajectories when closed.
See `clean_trajectories` for details.
record_reward: whether to record the reward in the trajectory data
record_env_state: whether to record the environment state in the trajectory data
video_fps (int): The FPS of the video to generate if save_video is True
render_substeps (bool): Whether to render substeps for video. This is captures an image of the environment after each physics step. This runs slower but generates more image frames
per environment step which when coupled with a higher video FPS can yield a smoother video.
avoid_overwriting_video (bool): If true, the wrapper will iterate over possible video names to avoid overwriting existing videos in the output directory. Useful for resuming training runs.
source_type (Optional[str]): a word to describe the source of the actions used to record episodes (e.g. RL, motionplanning, teleoperation)
source_desc (Optional[str]): A longer description describing how the demonstrations are collected
"""
def __init__(
self,
env: BaseEnv,
output_dir: str,
save_trajectory: bool = True,
trajectory_name: Optional[str] = None,
save_video: bool = True,
info_on_video: bool = False,
save_on_reset: bool = True,
save_video_trigger: Optional[Callable[[int], bool]] = None,
max_steps_per_video: Optional[int] = None,
clean_on_close: bool = True,
record_reward: bool = True,
record_env_state: bool = True,
video_fps: int = 30,
render_substeps: bool = False,
avoid_overwriting_video: bool = False,
source_type: Optional[str] = None,
source_desc: Optional[str] = None,
) -> None:
super().__init__(env)
[docs] self.output_dir = Path(output_dir)
if save_trajectory or save_video:
self.output_dir.mkdir(parents=True, exist_ok=True)
[docs] self.video_fps = video_fps
[docs] self._elapsed_record_steps = 0
[docs] self.save_video_trigger = save_video_trigger
[docs] self._trajectory_buffer: Step = None
[docs] self.max_steps_per_video = max_steps_per_video
[docs] self.max_episode_steps = gym_utils.find_max_episode_steps_value(env)
[docs] self.save_on_reset = save_on_reset
[docs] self.save_trajectory = save_trajectory
if self.base_env.num_envs > 1 and save_video:
assert (
max_steps_per_video is not None
), "On GPU parallelized environments, \
there must be a given max steps per video value in order to flush videos in order \
to avoid issues caused by partial resets. If your environment does not do partial \
resets you may set max_steps_per_video equal to the max_episode_steps"
[docs] self.clean_on_close = clean_on_close
[docs] self.record_reward = record_reward
[docs] self.record_env_state = record_env_state
if self.save_trajectory:
if not trajectory_name:
trajectory_name = time.strftime("%Y%m%d_%H%M%S")
self._h5_file = h5py.File(self.output_dir / f"{trajectory_name}.h5", "w")
# Use a separate json to store non-array data
self._json_path = self._h5_file.filename.replace(".h5", ".json")
self._json_data = dict(
env_info=parse_env_info(self.env),
commit_info=get_commit_info(),
episodes=[],
)
if self._json_data["env_info"] is not None:
self._json_data["env_info"][
"max_episode_steps"
] = self.max_episode_steps
if source_type is not None:
self._json_data["source_type"] = source_type
if source_desc is not None:
self._json_data["source_desc"] = source_desc
[docs] self._save_video = save_video
[docs] self.info_on_video = info_on_video
[docs] self.render_images = []
[docs] self.video_nrows = int(np.sqrt(self.unwrapped.num_envs))
[docs] self._avoid_overwriting_video = avoid_overwriting_video
[docs] self._already_warned_about_state_dict_inconsistency = False
# check if wrapped env is already wrapped by a CPU gym wrapper
cur_env = self.env
[docs] self.cpu_wrapped_env = False
while cur_env is not None:
if isinstance(cur_env, CPUGymWrapper):
self.cpu_wrapped_env = True
break
if hasattr(cur_env, "env"):
cur_env = cur_env.env
else:
break
[docs] self.render_substeps = render_substeps
if self.render_substeps:
_original_after_simulation_step = self.base_env._after_simulation_step
def wrapped_after_simulation_step():
_original_after_simulation_step()
if self.save_video:
if self.base_env.gpu_sim_enabled:
self.base_env.scene._gpu_fetch_all()
self.render_images.append(self.capture_image())
self.base_env._after_simulation_step = wrapped_after_simulation_step
@property
[docs] def num_envs(self):
return self.base_env.num_envs
@property
[docs] def base_env(self) -> BaseEnv:
return self.env.unwrapped
@property
[docs] def save_video(self):
if not self._save_video:
return False
if self.save_video_trigger is not None:
return self.save_video_trigger(self._elapsed_record_steps)
else:
return self._save_video
[docs] def capture_image(self, infos=None):
img = self.env.render()
img = common.to_numpy(img)
if len(img.shape) == 3:
img = img[None]
if infos is not None:
for i in range(len(img)):
info_item = {
k: v if np.size(v) == 1 else v[i] for k, v in infos.items()
}
img[i] = put_info_on_image(img[i], info_item)
if len(img.shape) > 3:
if len(img) == 1:
img = img[0]
else:
img = tile_images(img, nrows=self.video_nrows)
return img
[docs] def reset(
self,
*args,
seed: Optional[Union[int, list[int]]] = None,
options: Optional[dict] = None,
save=True,
**kwargs,
):
if self.save_on_reset:
if self.save_video and self.num_envs == 1:
self.flush_video(save=save)
# if doing a full reset then we flush all trajectories including incompleted ones
if self._trajectory_buffer is not None:
if options is None or "env_idx" not in options:
self.flush_trajectory(
env_idxs_to_flush=np.arange(self.num_envs), save=save
)
else:
self.flush_trajectory(
env_idxs_to_flush=common.to_numpy(options["env_idx"]), save=save
)
obs, info = super().reset(*args, seed=seed, options=options, **kwargs)
if info["reconfigure"]:
# if we reconfigure, there is the possibility that state dictionary looks different now
# so trajectory buffer must be wiped
self._trajectory_buffer = None
if self.save_trajectory:
state_dict = self.base_env.get_state_dict()
action = common.batch(
self.env.get_wrapper_attr("single_action_space").sample()
)
# check if state_dict is consistent
if not sapien_utils.is_state_dict_consistent(state_dict):
self.record_env_state = False
if not self._already_warned_about_state_dict_inconsistency:
logger.warn(
f"State dictionary is not consistent, disabling recording of environment states for {self.env}"
)
self._already_warned_about_state_dict_inconsistency = True
first_step = Step(
state=None,
observation=common.to_numpy(common.batch(obs)),
# note first reward/action etc. are ignored when saving trajectories to disk
action=common.to_numpy(common.batch(action.repeat(self.num_envs, 0))),
reward=np.zeros(
(
1,
self.num_envs,
),
dtype=float,
),
# terminated and truncated are fixed to be True at the start to indicate the start of an episode.
# an episode is done when one of these is True otherwise the trajectory is incomplete / a partial episode
terminated=np.ones((1, self.num_envs), dtype=bool),
truncated=np.ones((1, self.num_envs), dtype=bool),
done=np.ones((1, self.num_envs), dtype=bool),
success=np.zeros((1, self.num_envs), dtype=bool),
fail=np.zeros((1, self.num_envs), dtype=bool),
env_episode_ptr=np.zeros((self.num_envs,), dtype=int),
)
if self.record_env_state:
first_step.state = common.to_numpy(common.batch(state_dict))
env_idx = np.arange(self.num_envs)
if options is not None and "env_idx" in options:
env_idx = common.to_numpy(options["env_idx"])
if self._trajectory_buffer is None:
# Initialize trajectory buffer on the first episode based on given observation (which should be generated after all wrappers)
self._trajectory_buffer = first_step
else:
def recursive_replace(x, y):
if isinstance(x, np.ndarray):
x[-1, env_idx] = y[-1, env_idx]
else:
for k in x.keys():
recursive_replace(x[k], y[k])
# TODO (stao): how do we store states from GPU sim of tasks with objects not in every sub-scene?
# Maybe we shouldn't?
if self.record_env_state:
recursive_replace(self._trajectory_buffer.state, first_step.state)
recursive_replace(
self._trajectory_buffer.observation, first_step.observation
)
recursive_replace(self._trajectory_buffer.action, first_step.action)
if self.record_reward:
recursive_replace(self._trajectory_buffer.reward, first_step.reward)
recursive_replace(
self._trajectory_buffer.terminated, first_step.terminated
)
recursive_replace(
self._trajectory_buffer.truncated, first_step.truncated
)
recursive_replace(self._trajectory_buffer.done, first_step.done)
if self._trajectory_buffer.success is not None:
recursive_replace(
self._trajectory_buffer.success, first_step.success
)
if self._trajectory_buffer.fail is not None:
recursive_replace(self._trajectory_buffer.fail, first_step.fail)
if options is not None and "env_idx" in options:
options["env_idx"] = common.to_numpy(options["env_idx"])
self.last_reset_kwargs = copy.deepcopy(dict(options=options, **kwargs))
if seed is not None:
self.last_reset_kwargs.update(seed=seed)
return obs, info
[docs] def step(self, action):
if self.save_video and self._video_steps == 0:
# save the first frame of the video here (s_0) instead of inside reset as user
# may call env.reset(...) multiple times but we want to ignore empty trajectories
self.render_images.append(self.capture_image())
obs, rew, terminated, truncated, info = super().step(action)
if self.save_trajectory:
state_dict = self.base_env.get_state_dict()
if self.record_env_state:
self._trajectory_buffer.state = common.append_dict_array(
self._trajectory_buffer.state,
common.to_numpy(common.batch(state_dict)),
)
self._trajectory_buffer.observation = common.append_dict_array(
self._trajectory_buffer.observation,
common.to_numpy(common.batch(obs)),
)
self._trajectory_buffer.action = common.append_dict_array(
self._trajectory_buffer.action,
common.to_numpy(common.batch(action)),
)
if self.record_reward:
self._trajectory_buffer.reward = common.append_dict_array(
self._trajectory_buffer.reward,
common.to_numpy(common.batch(rew)),
)
self._trajectory_buffer.terminated = common.append_dict_array(
self._trajectory_buffer.terminated,
common.to_numpy(common.batch(terminated)),
)
self._trajectory_buffer.truncated = common.append_dict_array(
self._trajectory_buffer.truncated,
common.to_numpy(common.batch(truncated)),
)
done = terminated | truncated
self._trajectory_buffer.done = common.append_dict_array(
self._trajectory_buffer.done,
common.to_numpy(common.batch(done)),
)
if "success" in info:
self._trajectory_buffer.success = common.append_dict_array(
self._trajectory_buffer.success,
common.to_numpy(common.batch(info["success"])),
)
else:
self._trajectory_buffer.success = None
if "fail" in info:
self._trajectory_buffer.fail = common.append_dict_array(
self._trajectory_buffer.fail,
common.to_numpy(common.batch(info["fail"])),
)
else:
self._trajectory_buffer.fail = None
if self.save_video:
self._video_steps += 1
if self.info_on_video:
scalar_info = gym_utils.extract_scalars_from_info(
common.to_numpy(info), batch_size=self.num_envs
)
scalar_info["reward"] = common.to_numpy(rew)
if np.size(scalar_info["reward"]) > 1:
scalar_info["reward"] = [
float(rew) for rew in scalar_info["reward"]
]
else:
scalar_info["reward"] = float(scalar_info["reward"])
image = self.capture_image(scalar_info)
else:
image = self.capture_image()
self.render_images.append(image)
if (
self.max_steps_per_video is not None
and self._video_steps >= self.max_steps_per_video
):
self.flush_video()
self._elapsed_record_steps += 1
return obs, rew, terminated, truncated, info
[docs] def flush_trajectory(
self,
verbose=False,
ignore_empty_transition=True,
env_idxs_to_flush=None,
save: bool = True,
):
"""
Flushes a trajectory and by default saves it to disk
Arguments:
verbose (bool): whether to print out information about the flushed trajectory
ignore_empty_transition (bool): whether to ignore trajectories that did not have any actions
env_idxs_to_flush: which environments by id to flush. If None, all environments are flushed.
save (bool): whether to save the trajectory to disk
"""
flush_count = 0
if env_idxs_to_flush is None:
env_idxs_to_flush = np.arange(0, self.num_envs)
for env_idx in env_idxs_to_flush:
start_ptr = self._trajectory_buffer.env_episode_ptr[env_idx]
end_ptr = len(self._trajectory_buffer.done)
if ignore_empty_transition and end_ptr - start_ptr <= 1:
continue
flush_count += 1
if save:
self._episode_id += 1
traj_id = "traj_{}".format(self._episode_id)
group = self._h5_file.create_group(traj_id, track_order=True)
def recursive_add_to_h5py(
group: h5py.Group, data: Union[dict, Array], key
):
"""simple recursive data insertion for nested data structures into h5py, optimizing for visual data as well"""
if isinstance(data, dict):
subgrp = group.create_group(key, track_order=True)
for k in data.keys():
recursive_add_to_h5py(subgrp, data[k], k)
else:
if key == "rgb":
# NOTE(jigu): It is more efficient to use gzip than png for a sequence of images.
group.create_dataset(
"rgb",
data=data[start_ptr:end_ptr, env_idx],
dtype=data.dtype,
compression="gzip",
compression_opts=5,
)
elif key == "depth":
# NOTE (stao): By default now cameras in ManiSkill return depth values of type uint16 for numpy
group.create_dataset(
key,
data=data[start_ptr:end_ptr, env_idx],
dtype=data.dtype,
compression="gzip",
compression_opts=5,
)
elif key == "seg":
group.create_dataset(
key,
data=data[start_ptr:end_ptr, env_idx],
dtype=data.dtype,
compression="gzip",
compression_opts=5,
)
else:
group.create_dataset(
key,
data=data[start_ptr:end_ptr, env_idx],
dtype=data.dtype,
)
# Observations need special processing
if isinstance(self._trajectory_buffer.observation, dict):
recursive_add_to_h5py(
group, self._trajectory_buffer.observation, "obs"
)
elif isinstance(self._trajectory_buffer.observation, np.ndarray):
if self.cpu_wrapped_env:
group.create_dataset(
"obs",
data=self._trajectory_buffer.observation[start_ptr:end_ptr],
dtype=self._trajectory_buffer.observation.dtype,
)
else:
group.create_dataset(
"obs",
data=self._trajectory_buffer.observation[
start_ptr:end_ptr, env_idx
],
dtype=self._trajectory_buffer.observation.dtype,
)
else:
raise NotImplementedError(
f"RecordEpisode wrapper does not know how to handle observation data of type {type(self._trajectory_buffer.observation)}"
)
episode_info = dict(
episode_id=self._episode_id,
episode_seed=self.base_env._episode_seed[env_idx],
control_mode=self.base_env.control_mode,
elapsed_steps=end_ptr - start_ptr - 1,
)
if self.num_envs == 1:
episode_info.update(reset_kwargs=self.last_reset_kwargs)
else:
# NOTE (stao): With multiple envs in GPU simulation, reset_kwargs do not make much sense
episode_info.update(reset_kwargs=dict())
# slice some data to remove the first dummy frame.
actions = common.index_dict_array(
self._trajectory_buffer.action,
(slice(start_ptr + 1, end_ptr), env_idx),
)
terminated = self._trajectory_buffer.terminated[
start_ptr + 1 : end_ptr, env_idx
]
truncated = self._trajectory_buffer.truncated[
start_ptr + 1 : end_ptr, env_idx
]
if isinstance(self._trajectory_buffer.action, dict):
recursive_add_to_h5py(group, actions, "actions")
else:
group.create_dataset("actions", data=actions, dtype=np.float32)
group.create_dataset("terminated", data=terminated, dtype=bool)
group.create_dataset("truncated", data=truncated, dtype=bool)
if self._trajectory_buffer.success is not None:
group.create_dataset(
"success",
data=self._trajectory_buffer.success[
start_ptr + 1 : end_ptr, env_idx
],
dtype=bool,
)
episode_info.update(
success=self._trajectory_buffer.success[end_ptr - 1, env_idx]
)
if self._trajectory_buffer.fail is not None:
group.create_dataset(
"fail",
data=self._trajectory_buffer.fail[
start_ptr + 1 : end_ptr, env_idx
],
dtype=bool,
)
episode_info.update(
fail=self._trajectory_buffer.fail[end_ptr - 1, env_idx]
)
if self.record_env_state:
recursive_add_to_h5py(
group, self._trajectory_buffer.state, "env_states"
)
if self.record_reward:
group.create_dataset(
"rewards",
data=self._trajectory_buffer.reward[
start_ptr + 1 : end_ptr, env_idx
],
dtype=np.float32,
)
self._json_data["episodes"].append(common.to_numpy(episode_info))
dump_json(self._json_path, self._json_data, indent=2)
if verbose:
if flush_count == 1:
print(f"Recorded episode {self._episode_id}")
else:
print(
f"Recorded episodes {self._episode_id - flush_count} to {self._episode_id}"
)
# truncate self._trajectory_buffer down to save memory
if flush_count > 0:
self._trajectory_buffer.env_episode_ptr[env_idxs_to_flush] = (
len(self._trajectory_buffer.done) - 1
)
min_env_ptr = self._trajectory_buffer.env_episode_ptr.min()
N = len(self._trajectory_buffer.done)
if self.record_env_state:
self._trajectory_buffer.state = common.index_dict_array(
self._trajectory_buffer.state, slice(min_env_ptr, N)
)
self._trajectory_buffer.observation = common.index_dict_array(
self._trajectory_buffer.observation, slice(min_env_ptr, N)
)
self._trajectory_buffer.action = common.index_dict_array(
self._trajectory_buffer.action, slice(min_env_ptr, N)
)
if self.record_reward:
self._trajectory_buffer.reward = common.index_dict_array(
self._trajectory_buffer.reward, slice(min_env_ptr, N)
)
self._trajectory_buffer.terminated = common.index_dict_array(
self._trajectory_buffer.terminated, slice(min_env_ptr, N)
)
self._trajectory_buffer.truncated = common.index_dict_array(
self._trajectory_buffer.truncated, slice(min_env_ptr, N)
)
self._trajectory_buffer.done = common.index_dict_array(
self._trajectory_buffer.done, slice(min_env_ptr, N)
)
if self._trajectory_buffer.success is not None:
self._trajectory_buffer.success = common.index_dict_array(
self._trajectory_buffer.success, slice(min_env_ptr, N)
)
if self._trajectory_buffer.fail is not None:
self._trajectory_buffer.fail = common.index_dict_array(
self._trajectory_buffer.fail, slice(min_env_ptr, N)
)
self._trajectory_buffer.env_episode_ptr -= min_env_ptr
[docs] def flush_video(
self,
name=None,
suffix="",
verbose=False,
ignore_empty_transition=True,
save: bool = True,
):
"""
Flush a video of the recorded episode(s) anb by default saves it to disk
Arguments:
name (str): name of the video file. If None, it will be named with the episode id.
suffix (str): suffix to add to the video file name
verbose (bool): whether to print out information about the flushed video
ignore_empty_transition (bool): whether to ignore trajectories that did not have any actions
save (bool): whether to save the video to disk
"""
if len(self.render_images) == 0:
return
if ignore_empty_transition and len(self.render_images) == 1:
return
if save:
self._video_id += 1
if name is None:
video_name = "{}".format(self._video_id)
if suffix:
video_name += "_" + suffix
if self._avoid_overwriting_video:
while (
Path(self.output_dir)
/ (video_name.replace(" ", "_").replace("\n", "_") + ".mp4")
).exists():
self._video_id += 1
video_name = "{}".format(self._video_id)
if suffix:
video_name += "_" + suffix
else:
video_name = name
images_to_video(
self.render_images,
str(self.output_dir),
video_name=video_name,
fps=self.video_fps,
verbose=verbose,
)
self._video_steps = 0
self.render_images = []
[docs] def close(self) -> None:
if self._closed:
# There is some strange bug when vector envs using record wrapper are closed/deleted, this code runs twice
return
self._closed = True
if self.save_trajectory:
# Handle the last episode only when `save_on_reset=True`
if self.save_on_reset and self._trajectory_buffer is not None:
self.flush_trajectory(
ignore_empty_transition=True,
env_idxs_to_flush=np.arange(self.num_envs),
)
if self.clean_on_close:
clean_trajectories(self._h5_file, self._json_data)
dump_json(self._json_path, self._json_data, indent=2)
self._h5_file.close()
if self.save_video:
if self.save_on_reset:
self.flush_video()
return super().close()