kitae.envs.wrappers package#

Submodules#

kitae.envs.wrappers.compatibility module#

class kitae.envs.wrappers.compatibility.EnvpoolCompatibility(env: Env[ObsType, ActType])#

Bases: Wrapper

reset(*, seed: int | None = None, options: dict[str, Any] | None = None) tuple[Any, dict[str, Any]]#

Uses the reset() of the env that can be overwritten to change the returned data.

class kitae.envs.wrappers.compatibility.SubProcVecParallelEnvCompatibility(env: Env[ObsType, ActType])#

Bases: Wrapper

reset(*, seed: int | None = None, options: dict[str, Any] | None = None) tuple[Any, dict[str, Any]]#

Uses the reset() of the env that can be overwritten to change the returned data.

kitae.envs.wrappers.record_episode_statistics module#

Wrapper that tracks the cumulative rewards and episode lengths.

class kitae.envs.wrappers.record_episode_statistics.ParallelRecordEpisodeStatistics(env: Env, deque_size: int = 100)#

Bases: Wrapper, RecordConstructorArgs

This wrapper will keep track of cumulative rewards and episode lengths.

At the end of an episode, the statistics of the episode will be added to info using the key episode. If using a vectorized environment also the key _episode is used which indicates whether the env at the respective index has the episode statistics.

After the completion of an episode, info will look like this:

>>> info = {
...     "episode": {
...         "r": "<cumulative reward>",
...         "l": "<episode length>",
...         "t": "<elapsed time since beginning of episode>"
...     },
... }

For a vectorized environments the output will be in the form of:

>>> infos = {
...     "final_observation": "<array of length num-envs>",
...     "_final_observation": "<boolean array of length num-envs>",
...     "final_info": "<array of length num-envs>",
...     "_final_info": "<boolean array of length num-envs>",
...     "episode": {
...         "r": "<array of cumulative reward>",
...         "l": "<array of episode length>",
...         "t": "<array of elapsed time since beginning of episode>"
...     },
...     "_episode": "<boolean array of length num-envs>"
... }

Moreover, the most recent rewards and episode lengths are stored in buffers that can be accessed via wrapped_env.return_queue and wrapped_env.length_queue respectively.

return_queue#

The cumulative rewards of the last deque_size-many episodes

length_queue#

The lengths of the last deque_size-many episodes

reset(**kwargs)#

Resets the environment using kwargs and resets the episode returns and lengths.

step(action)#

Steps through the environment, recording the episode statistics.

kitae.envs.wrappers.vector module#

class kitae.envs.wrappers.vector.CloudPickleWrapper(x)#

Bases: object

Uses cloudpickle to serialize contents

class kitae.envs.wrappers.vector.SubProcVecParallelEnv(env_fns: Callable[[Any], ParallelEnv])#

Bases: VecParallelEnv

VecEnv that runs multiple environments in parallel in subproceses and communicates with them via pipes. Recommended to use when num_envs > 1 and step() can be a bottleneck.

close()#
reset()#

Reset all the environments and return an array of observations, or a dict of observation arrays.

If step_async is still doing work, that work will be cancelled and step_wait() should not be called until step_async() is invoked again.

step_async(actions)#

Tell all the environments to start taking a step with the given actions. Call step_wait() to get the results of the step.

You should not call this if a step_async run is already pending.

step_wait()#

Wait for the step taken with step_async().

Returns (obs, rews, dones, infos):
  • obs: an array of observations, or a dict of

    arrays of observations.

  • rews: an array of rewards

  • dones: an array of “episode done” booleans

  • infos: a sequence of info objects

class kitae.envs.wrappers.vector.VecParallelEnv(num_envs: int, observation_spaces, action_spaces)#

Bases: ABC

An abstract asynchronous, vectorized environment

close()#
close_extras()#

Clean up the extra resources, beyond what’s in this base class. Only runs when not self.closed.

get_images()#

Return RGB images from each environment

get_wrapper_attr(name: str) Any#

Gets an attribute from the wrapper and lower environments if name doesn’t exist in this object.

Parameters:

name – The variable name to get

Returns:

The variable with name in wrapper or lower environments

render(mode='human')#
abstract reset()#

Reset all the environments and return an array of observations, or a dict of observation arrays.

If step_async is still doing work, that work will be cancelled and step_wait() should not be called until step_async() is invoked again.

step(actions)#

Step the environments synchronously.

This is available for backwards compatibility.

abstract step_async(actions)#

Tell all the environments to start taking a step with the given actions. Call step_wait() to get the results of the step.

You should not call this if a step_async run is already pending.

abstract step_wait()#

Wait for the step taken with step_async().

Returns (obs, rews, dones, infos):
  • obs: an array of observations, or a dict of

    arrays of observations.

  • rews: an array of rewards

  • dones: an array of “episode done” booleans

  • infos: a sequence of info objects

kitae.envs.wrappers.vector.stack(xs: list[dict[str, ndarray]])#
kitae.envs.wrappers.vector.tile_images(img_nhwc)#

Tile N images into one big PxQ image (P,Q) are chosen to be as close as possible, and if N is square, then P=Q.

input: img_nhwc, list or array of images, ndim=4 once turned into array

n = batch index, h = height, w = width, c = channel

Returns:

bigim_HWc, ndarray with ndim=3

kitae.envs.wrappers.vector.worker(child_conn: Connection, parent_conn: Connection, env_fn_wrapper)#

Module contents#