kitae.envs.wrappers package#
Submodules#
kitae.envs.wrappers.compatibility module#
- class kitae.envs.wrappers.compatibility.EnvpoolCompatibility(env: Env[ObsType, ActType])#
Bases:
Wrapper
- class kitae.envs.wrappers.compatibility.SubProcVecParallelEnvCompatibility(env: Env[ObsType, ActType])#
Bases:
Wrapper
kitae.envs.wrappers.record_episode_statistics module#
Wrapper that tracks the cumulative rewards and episode lengths.
- class kitae.envs.wrappers.record_episode_statistics.ParallelRecordEpisodeStatistics(env: Env, deque_size: int = 100)#
Bases:
Wrapper
,RecordConstructorArgs
This wrapper will keep track of cumulative rewards and episode lengths.
At the end of an episode, the statistics of the episode will be added to
info
using the keyepisode
. If using a vectorized environment also the key_episode
is used which indicates whether the env at the respective index has the episode statistics.After the completion of an episode,
info
will look like this:>>> info = { ... "episode": { ... "r": "<cumulative reward>", ... "l": "<episode length>", ... "t": "<elapsed time since beginning of episode>" ... }, ... }
For a vectorized environments the output will be in the form of:
>>> infos = { ... "final_observation": "<array of length num-envs>", ... "_final_observation": "<boolean array of length num-envs>", ... "final_info": "<array of length num-envs>", ... "_final_info": "<boolean array of length num-envs>", ... "episode": { ... "r": "<array of cumulative reward>", ... "l": "<array of episode length>", ... "t": "<array of elapsed time since beginning of episode>" ... }, ... "_episode": "<boolean array of length num-envs>" ... }
Moreover, the most recent rewards and episode lengths are stored in buffers that can be accessed via
wrapped_env.return_queue
andwrapped_env.length_queue
respectively.- return_queue#
The cumulative rewards of the last
deque_size
-many episodes
- length_queue#
The lengths of the last
deque_size
-many episodes
- reset(**kwargs)#
Resets the environment using kwargs and resets the episode returns and lengths.
- step(action)#
Steps through the environment, recording the episode statistics.
kitae.envs.wrappers.vector module#
- class kitae.envs.wrappers.vector.CloudPickleWrapper(x)#
Bases:
object
Uses cloudpickle to serialize contents
- class kitae.envs.wrappers.vector.SubProcVecParallelEnv(env_fns: Callable[[Any], ParallelEnv])#
Bases:
VecParallelEnv
VecEnv that runs multiple environments in parallel in subproceses and communicates with them via pipes. Recommended to use when num_envs > 1 and step() can be a bottleneck.
- close()#
- reset()#
Reset all the environments and return an array of observations, or a dict of observation arrays.
If step_async is still doing work, that work will be cancelled and step_wait() should not be called until step_async() is invoked again.
- step_async(actions)#
Tell all the environments to start taking a step with the given actions. Call step_wait() to get the results of the step.
You should not call this if a step_async run is already pending.
- step_wait()#
Wait for the step taken with step_async().
- Returns (obs, rews, dones, infos):
- obs: an array of observations, or a dict of
arrays of observations.
rews: an array of rewards
dones: an array of “episode done” booleans
infos: a sequence of info objects
- class kitae.envs.wrappers.vector.VecParallelEnv(num_envs: int, observation_spaces, action_spaces)#
Bases:
ABC
An abstract asynchronous, vectorized environment
- close()#
- close_extras()#
Clean up the extra resources, beyond what’s in this base class. Only runs when not self.closed.
- get_images()#
Return RGB images from each environment
- get_wrapper_attr(name: str) Any #
Gets an attribute from the wrapper and lower environments if name doesn’t exist in this object.
- Parameters:
name – The variable name to get
- Returns:
The variable with name in wrapper or lower environments
- render(mode='human')#
- abstract reset()#
Reset all the environments and return an array of observations, or a dict of observation arrays.
If step_async is still doing work, that work will be cancelled and step_wait() should not be called until step_async() is invoked again.
- step(actions)#
Step the environments synchronously.
This is available for backwards compatibility.
- abstract step_async(actions)#
Tell all the environments to start taking a step with the given actions. Call step_wait() to get the results of the step.
You should not call this if a step_async run is already pending.
- abstract step_wait()#
Wait for the step taken with step_async().
- Returns (obs, rews, dones, infos):
- obs: an array of observations, or a dict of
arrays of observations.
rews: an array of rewards
dones: an array of “episode done” booleans
infos: a sequence of info objects
- kitae.envs.wrappers.vector.stack(xs: list[dict[str, ndarray]])#
- kitae.envs.wrappers.vector.tile_images(img_nhwc)#
Tile N images into one big PxQ image (P,Q) are chosen to be as close as possible, and if N is square, then P=Q.
- input: img_nhwc, list or array of images, ndim=4 once turned into array
n = batch index, h = height, w = width, c = channel
- Returns:
bigim_HWc, ndarray with ndim=3
- kitae.envs.wrappers.vector.worker(child_conn: Connection, parent_conn: Connection, env_fn_wrapper)#