mate package

Subpackages

Submodules

mate.constants module

Constants of the Multi-Agent Tracking Environment.

mate.constants.TERRAIN_SIZE = 1000.0

Terrain size. The terrain is a 2000.0 by 2000.0 square.

mate.constants.TERRAIN_WIDTH = 2000.0

Terrain width. The terrain is a 2000.0 by 2000.0 square.

mate.constants.TERRAIN_SPACE = Box(-1000.0, 1000.0, (2,), float64)

The space object of the terrain. The terrain is a 2000.0 by 2000.0 square. (i.e., a square within range of \([-1000, +1000] \times [-1000, +1000]\) in cartesian coordinates.)

mate.constants.WAREHOUSES = array([[ 925.,  925.],        [-925.,  925.],        [-925., -925.],        [ 925., -925.]])

Center locations of the warehouses.

mate.constants.NUM_WAREHOUSES = 4

Number of warehouses.

mate.constants.WAREHOUSE_RADIUS = 75.0

Half width of the squared warehouses.

mate.constants.MAX_CAMERA_VIEWING_ANGLE = 180.0

Maximum viewing angle of cameras in degrees.

mate.constants.TARGET_RADIUS = 0.0

Radius of targets.

mate.constants.PRESERVED_SPACE = Box([    0.     0.     0.     0. -2000. -2000. -2000. -2000. -2000. -2000.  -2000. -2000.     0.], [  inf   inf   inf   inf 2000. 2000. 2000. 2000. 2000. 2000. 2000. 2000.  1000.], (13,), float64)

The space object of agent’s preserved data.

mate.constants.PRESERVED_DIM = 13

Preserved observation dimension, which holds the number of entities in the environment and the index of current agent.

mate.constants.OBSERVATION_OFFSET = 13

Preserved observation dimension, which holds the number of entities in the environment and the index of current agent.

mate.constants.CAMERA_STATE_DIM_PUBLIC = 6

Dimension of camera’s public state.

mate.constants.CAMERA_STATE_SPACE_PUBLIC = Box([-2000. -2000.     0. -2000. -2000.     0.], [2000. 2000. 1000. 2000. 2000.  180.], (6,), float64)

The space object of camera’s public state.

mate.constants.CAMERA_STATE_DIM_PRIVATE = 9

Dimension of camera’s private state.

mate.constants.CAMERA_STATE_SPACE_PRIVATE = Box([-2000. -2000.     0. -2000. -2000.     0.     0.     0.     0.], [2000. 2000. 1000. 2000. 2000.  180. 2000.  180.  180.], (9,), float64)

The space object of camera’s private state.

mate.constants.TARGET_STATE_DIM_PUBLIC = 4

Dimension of target’s public state.

mate.constants.TARGET_STATE_SPACE_PUBLIC = Box([-2.e+03 -2.e+03  0.e+00 -1.e+00], [2.e+03 2.e+03 2.e+03 1.e+00], (4,), float64)

The space object of target’s public state.

mate.constants.TARGET_STATE_DIM_PRIVATE = 14

Dimension of target’s private state.

mate.constants.TARGET_STATE_SPACE_PRIVATE = Box([-2.e+03 -2.e+03  0.e+00 -1.e+00  0.e+00  1.e+00  0.e+00  0.e+00  0.e+00   0.e+00 -1.e+00 -1.e+00 -1.e+00 -1.e+00], [2.e+03 2.e+03 2.e+03 1.e+00 2.e+03 2.e+00    inf    inf    inf    inf  1.e+00 1.e+00 1.e+00 1.e+00], (14,), float64)

The space object of target’s private state.

mate.constants.OBSTACLE_STATE_DIM = 3

Dimension of obstacle’s state.

mate.constants.OBSTACLE_STATE_SPACE = Box([-2000. -2000.     0.], [2000. 2000. 1000.], (3,), float64)

The space object of obstacle’s state.

mate.constants.CAMERA_ACTION_DIM = 2

Dimension of camera’s action.

mate.constants.CAMERA_DEFAULT_ACTION = array([0., 0.])

Default action of cameras.

mate.constants.TARGET_ACTION_DIM = 2

Dimension of target’s action.

mate.constants.TARGET_DEFAULT_ACTION = array([0., 0.])

Default action of targets.

mate.constants.camera_observation_space_of(num_cameras: int, num_targets: int, num_obstacles: int) Box[source]

Get the space object of a single camera’s observation from the given number of entities.

mate.constants.target_observation_space_of(num_cameras: int, num_targets: int, num_obstacles: int) Box[source]

Get the space object of a single target’s observation from the given number of entities.

mate.constants.observation_space_of(team: Team, num_cameras: int, num_targets: int, num_obstacles: int) Box[source]

Get the space object of a single agent’s observation of the given team from the given number of entities.

mate.constants.camera_observation_indices_of(num_cameras: int, num_targets: int, num_obstacles: int) ndarray[source]

The start indices of each part of the camera observation.

mate.constants.target_observation_indices_of(num_cameras: int, num_targets: int, num_obstacles: int) ndarray[source]

The start indices of each part of the target observation.

mate.constants.observation_indices_of(team: Team, num_cameras: int, num_targets: int, num_obstacles: int) ndarray[source]

The start indices of each part of the observation.

mate.constants.camera_observation_slices_of(num_cameras: int, num_targets: int, num_obstacles: int) Dict[str, slice][source]

The slices of each part of the camera observation.

mate.constants.target_observation_slices_of(num_cameras: int, num_targets: int, num_obstacles: int) Dict[str, slice][source]

The slices of each part of the target observation.

mate.constants.observation_slices_of(team: Team, num_cameras: int, num_targets: int, num_obstacles: int) ndarray[source]

The slices of each part of the observation.

mate.constants.camera_coordinate_mask_of(num_cameras: int, num_targets: int, num_obstacles: int) ndarray[source]

Get the bit mask from the given number of entities. The bit values is true if the corresponding entry in a single target’s observation is a coordinate value (exclude the current target itself).

mate.constants.target_coordinate_mask_of(num_cameras: int, num_targets: int, num_obstacles: int) ndarray[source]

Get the bit mask from the given number of entities. The bit values is true if the corresponding entry in a single target’s observation is a coordinate value (exclude the current target itself).

mate.constants.coordinate_mask_of(team: Team, num_cameras: int, num_targets: int, num_obstacles: int) ndarray[source]

Get the bit mask from the given number of entities. The bit values is true if the corresponding entry in a single agent’s observation is a boolean value (include the current agent itself).

mate.environment module

The Multi-Agent Tracking Environment.

mate.environment.ASSETS_DIR = PosixPath('/home/docs/checkouts/readthedocs.org/user_builds/mate-gym/checkouts/latest/mate/assets')

The asset directory path.

mate.environment.DEFAULT_CONFIG_FILE = PosixPath('/home/docs/checkouts/readthedocs.org/user_builds/mate-gym/checkouts/latest/mate/assets/MATE-4v8-9.yaml')

The default configuration file.

mate.environment.read_config(config_or_path: Dict[str, Any] | str | None = None, **kwargs) Dict[str, Any][source]

Load configuration from a dictionary mapping or a JSON/YAML file.

class mate.environment.EnvMeta[source]

Bases: type

Helper metaclass for instance check.

class mate.environment.MultiAgentTracking(config: Dict[str, Any] | str | None = None, **kwargs)[source]

Bases: Env, EzPickle

The main class of the Multi-Agent Tracking Environment. It encapsulates an environment with arbitrary behind-the-scenes dynamics. This environment is partially observed for both teams.

The main API methods that users of this class need to know are:

  • step

  • reset

  • render

  • close

  • seed

  • send_messages <- new method

  • receive_messages <- new method

  • load_config <- new method

And set the following attributes:

action_space: A tuple of two Space objects corresponding to valid joint actions of cameras and targets camera_action_space: The Space object corresponding to a single camera’s valid actions camera_joint_action_space: The Space object corresponding to valid joint actions of all cameras target_action_space: The Space object corresponding to a single target’s valid actions target_joint_action_space: The Space object corresponding to valid joint actions of all targets observation_space: A tuple of two Space objects corresponding to valid joint observations of cameras and targets camera_observation_space: The Space object corresponding to a single camera’s valid observations camera_joint_observation_space: The Space object corresponding to valid joint observations of all cameras target_observation_space: The Space object corresponding to a single target’s valid observations target_joint_observation_space: The Space object corresponding to valid joint observations of all targets

The methods are accessed publicly as “step”, “reset”, etc…

metadata: Dict[str, Any] = {'render.modes': ['human', 'rgb_array'], 'video.frames_per_second': 60, 'video.output_frames_per_second': 60}
DEFAULT_CONFIG_FILE = PosixPath('/home/docs/checkouts/readthedocs.org/user_builds/mate-gym/checkouts/latest/mate/assets/MATE-4v8-9.yaml')

The default configuration file.

__init__(config: Dict[str, Any] | str | None = None, **kwargs) None[source]

Initialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.

Parameters:

config (Optional[Union[Dict[str, Any], str]]) – a dictionary mapping or a path to a readable JSON/YAML file

action_space: Space[ActType]
observation_space: Space[ObsType]
load_config(config: Dict[str, Any] | str | None = None) None[source]

Reinitialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.

Parameters:

config (Optional[Union[Dict[str, Any], str]]) – a dictionary mapping or a path to a readable JSON/YAML file

Examples

You can change the environment configuration without creating a new environment, and this will keep the wrappers you add.

>>> env = mate.make('MultiAgentTracking-v0', config='MATE-4v8-9.yaml')
>>> env = mate.MultiCamera(env, target_agent=mate.GreedyTargetAgent(seed=0))
>>> print(env)
<MultiCamera<MultiAgentTracking<MultiAgentTracking-v0>>(4 cameras, 8 targets, 9 obstacles)>
>>> env.load_config('MATE-4v2-9.yaml')
>>> print(env)
<MultiCamera<MultiAgentTracking<MultiAgentTracking-v0>>(4 cameras, 2 targets, 9 obstacles)>
step(action: Tuple[ndarray, ndarray]) Tuple[Tuple[ndarray, ndarray], Tuple[float, float], bool, Tuple[List[dict], List[dict]]][source]

Run one timestep of the environment’s dynamics. When end of episode is reached, you are responsible for calling reset() to reset this environment’s state.

Accepts a tuple of cameras’ joint action and targets’ joint action, and returns a tuple (observation, reward, done, info).

Parameters:

action (Tuple[np.ndarray, np.ndarray]) – a tuple of joint actions provided by the camera agents and the target agents

Returns:

a tuple of agent’s observation of the current environment reward (Tuple[float, float]): a tuple of the amount of reward returned after previous action done (bool): whether the episode has ended, in which case further step() calls will return undefined results info (Tuple[List[dict], List[dict]]): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning)

Return type:

observation (Tuple[np.ndarray, np.ndarray])

reset(*, seed: int | None = None) Tuple[ndarray, ndarray][source]

Resets the environment to an initial state and returns an initial observation. The entities (cameras, targets and obstacles) may be shuffled if not explicitly disabled in configuration.

Note that unless an explicit seed is provided, this function would not reset the environment’s random number generator(s). Random variables in the environment’s state should be sampled independently between multiple calls to reset(). In other words, each call of reset() should yield an environment suitable for a new episode, independent of previous episodes.

Parameters:

seed (int) – the seed for the random number generator(s)

Returns:

the initial observations of all cameras and targets.

Return type:

observations (Tuple[numpy.ndarray, np.ndarray])

send_messages(messages: Message | Iterable[Message]) None[source]

Buffer the messages from an agent to others in the same team.

The environment will send the messages to recipients’ through method receive_messages(), and also info field of step() results.

receive_messages(agent_id: Tuple[Team, int] | None = None, agent: AgentType | None = None) Tuple[List[List[Message]], List[List[Message]]] | List[Message][source]

Retrieve the messages to recipients. If no agent is specified, this method will return all the messages to all agents in the environment.

The environment will also put the messages to recipients’ info field of step() results.

state() ndarray[source]

The global state of the environment.

joint_observation() Tuple[ndarray, ndarray][source]

Joint observations of both teams.

render(mode: str = 'human', window_size: int = 800, onetime_callbacks: Iterable[Callable[[MultiAgentTracking, str], None]] = ()) bool | ndarray[source]

Render the environment.

The set of supported modes varies per environment. (And some environments do not support rendering at all.) By convention, if mode is:

  • human: render to the current display or terminal and return nothing. Usually for human consumption.

  • rgb_array: Return an numpy.ndarray with shape (x, y, 3), representing RGB values for an x-by-y pixel image, suitable for turning into a video.

Parameters:
  • mode (str) – the mode to render with

  • window_size (int) – the width and height of the render window (only valid for the first call)

  • onetime_callbacks (Iterable[callable]) – callback functions for the rendering results

add_render_callback(name: str, callback: Callable[[MultiAgentTracking, str], None]) None[source]

Add a callback function to the render function.

This is useful to add additional elements to the rendering results.

close() None[source]

Perform necessary cleanup.

Environments will automatically close() themselves when garbage collected or when the program exits.

seed(seed: int | None = None) List[int][source]

Set the seed for this environment’s random number generators.

Note

Some environments use multiple pseudorandom number generators. We want to capture all such seeds used in order to ensure that there aren’t accidental correlations between multiple generators.

Returns:

the list of seeds used in this environment’s random

number generators. The first value in the list should be the “main” seed, or the value which a reproducer should pass to ‘seed’. Often, the main seed equals the provided ‘seed’, but this won’t be true if seed=None, for example.

Return type:

list<bigint>

property np_random: RandomState

The main random number generator of the environment.

route_messages(messages: List[Message]) List[Message][source]

Convert broadcast messages to peer-to-peer forms.

property name: str

Name of the environment.

property max_episode_steps: int

Maximum number of episode steps.

property camera_min_viewing_angle: float

Minimum viewing angle of cameras in degrees.

property camera_max_sight_range: float

Maximum sight range of cameras.

property camera_rotation_step: float

Maximum rotation step of cameras in degrees.

property camera_zooming_step: float

Maximum zooming step of cameras in degrees.

property target_step_size: float

Maximum step size of targets.

property target_sight_range: float

Sight range of targets.

property num_cargoes_per_target: int

Average number of cargoes per target.

property targets_start_with_cargoes: bool

Always assign cargoes to the target at the beginning of an episode.

property bounty_factor: float

The ratio of the maximum bounty reward over the freight reward.

property obstacle_transmittance: float

Transmittance coefficient of obstacles.

property shuffle_entities: bool

Whether or not to shuffle entity IDs when reset the environment.

property num_warehouses: int

Number of warehouses.

property num_cameras: int

Number of camera(s) in the environment.

property num_targets: int

Number of target(s) in the environment.

property num_obstacles: int

Number of obstacle(s) in the environment.

property high_capacity_target_split: float

Population ratio of the high-capacity target in the target team.

property num_high_capacity_targets: float

Number of high-capacity target(s) in the target team.

property num_low_capacity_targets: float

Number of low-capacity target(s) in the target team.

property camera_observation_dim: int

Dimension of single camera observation.

property target_observation_dim: int

Dimension of single target observation.

mate.utils module

Utility functions for the Multi-Agent Tracking Environment.

mate.utils.seed_everything(seed: int) None[source]

Set the seed for global random number generators.

mate.utils.RAD2DEG = 57.29577951308232

Coefficient that converts the radian number to the equivalent number in degrees. The actual value is \(\frac{180}{\pi}\).

mate.utils.DEG2RAD = 0.017453292519943295

Coefficient that converts the number degrees number to the radian equivalent. The actual value is \(\frac{\pi}{180}\).

mate.utils.sin_deg(x)[source]

Trigonometric sine in degrees, element-wise.

\[\sin_{\text{deg}} ( x ) = \sin \left( \frac{\pi}{180} x \right)\]
mate.utils.cos_deg(x)[source]

Trigonometric cosine in degrees, element-wise.

\[\cos_{\text{deg}} ( x ) = \cos \left( \frac{\pi}{180} x \right)\]
mate.utils.tan_deg(x)[source]

Trigonometric tangent in degrees, element-wise.

\[\tan_{\text{deg}} ( x ) = \tan \left( \frac{\pi}{180} x \right)\]
mate.utils.arcsin_deg(x)[source]

Trigonometric inverse sine in degrees, element-wise.

\[\arcsin_{\text{deg}} ( x ) = \frac{180}{\pi} \arcsin ( x )\]
mate.utils.arccos_deg(x)[source]

Trigonometric inverse cosine in degrees, element-wise.

\[\arccos_{\text{deg}} ( x ) = \frac{180}{\pi} \arcsin ( x )\]
mate.utils.arctan2_deg(y, x)[source]

Element-wise arc tangent of y/x in degrees.

\[\operatorname{arctan2}_{\text{deg}} ( y, x ) = \frac{180}{\pi} \arctan \left( \frac{y}{x} \right)\]
mate.utils.cartesian2polar(x, y)[source]

Convert cartesian coordinates to polar coordinates in degrees, element-wise.

\[\operatorname{cartesian2polar} ( x, y ) = \left( \sqrt{x^2 + y^2}, \operatorname{arctan2}_{\text{deg}} ( y, x ) \right)\]
mate.utils.polar2cartesian(rho, phi)[source]

Convert polar coordinates to cartesian coordinates in degrees, element-wise.

\[\operatorname{polar2cartesian} ( \rho, \phi ) = \left( \rho \cos_{\text{deg}} ( \phi ), \rho \sin_{\text{deg}} ( \phi ) \right)\]
mate.utils.normalize_angle(angle)[source]

Normalize a angle in degree to \([-180, +180)\).

class mate.utils.Vector2D(vector=None, norm=None, angle=None, origin=None)[source]

Bases: object

2D Vector.

property vector
property x
property y
property endpoint
property angle
property norm
copy()[source]
class mate.utils.Team(value)[source]

Bases: Enum

Enumeration of teams.

CAMERA = 0
TARGET = 1
class mate.utils.Message(sender: int, recipient: int | None, content: Any, team: Team, broadcasting: bool = False)[source]

Bases: object

Message class for communication between agents in the same team.

sender: int

Agent index of the sender.

recipient: int | None

Agent index of the recipient, leave None for broadcasting.

content: Any

Message content.

team: Team

String to indicate the team of agents.

broadcasting: bool = False

Whether or not to broadcast to all teammates.

Module contents

MATE: The Multi-Agent Tracking Environment.

mate.make(id: str | EnvSpec, max_episode_steps: int | None = None, autoreset: bool = False, apply_api_compatibility: bool | None = None, disable_env_checker: bool | None = None, **kwargs) Env[source]

Create an environment according to the given ID.

To find all available environments use gym.envs.registry.keys() for all valid ids.

Parameters:
  • id – Name of the environment. Optionally, a module to import can be included, eg. ‘module:Env-v0’

  • max_episode_steps – Maximum length of an episode (TimeLimit wrapper).

  • autoreset – Whether to automatically reset the environment after each episode (AutoResetWrapper).

  • apply_api_compatibility – Whether to wrap the environment with the StepAPICompatibility wrapper that converts the environment step from a done bool to return termination and truncation bools. By default, the argument is None to which the environment specification apply_api_compatibility is used which defaults to False. Otherwise, the value of apply_api_compatibility is used. If True, the wrapper is applied otherwise, the wrapper is not applied.

  • disable_env_checker – If to run the env checker, None will default to the environment specification disable_env_checker (which is by default False, running the environment checker), otherwise will run according to this parameter (True = not run, False = run)

  • kwargs – Additional arguments to pass to the environment constructor.

Returns:

An instance of the environment.

Raises:

Error – If the id doesn’t exist then an error is raised

mate.camera_observation_space_of(num_cameras: int, num_targets: int, num_obstacles: int) Box[source]

Get the space object of a single camera’s observation from the given number of entities.

mate.target_observation_space_of(num_cameras: int, num_targets: int, num_obstacles: int) Box[source]

Get the space object of a single target’s observation from the given number of entities.

mate.observation_space_of(team: Team, num_cameras: int, num_targets: int, num_obstacles: int) Box[source]

Get the space object of a single agent’s observation of the given team from the given number of entities.

mate.camera_observation_indices_of(num_cameras: int, num_targets: int, num_obstacles: int) ndarray[source]

The start indices of each part of the camera observation.

mate.target_observation_indices_of(num_cameras: int, num_targets: int, num_obstacles: int) ndarray[source]

The start indices of each part of the target observation.

mate.observation_indices_of(team: Team, num_cameras: int, num_targets: int, num_obstacles: int) ndarray[source]

The start indices of each part of the observation.

mate.camera_observation_slices_of(num_cameras: int, num_targets: int, num_obstacles: int) Dict[str, slice][source]

The slices of each part of the camera observation.

mate.target_observation_slices_of(num_cameras: int, num_targets: int, num_obstacles: int) Dict[str, slice][source]

The slices of each part of the target observation.

mate.observation_slices_of(team: Team, num_cameras: int, num_targets: int, num_obstacles: int) ndarray[source]

The slices of each part of the observation.

mate.camera_coordinate_mask_of(num_cameras: int, num_targets: int, num_obstacles: int) ndarray[source]

Get the bit mask from the given number of entities. The bit values is true if the corresponding entry in a single target’s observation is a coordinate value (exclude the current target itself).

mate.target_coordinate_mask_of(num_cameras: int, num_targets: int, num_obstacles: int) ndarray[source]

Get the bit mask from the given number of entities. The bit values is true if the corresponding entry in a single target’s observation is a coordinate value (exclude the current target itself).

mate.coordinate_mask_of(team: Team, num_cameras: int, num_targets: int, num_obstacles: int) ndarray[source]

Get the bit mask from the given number of entities. The bit values is true if the corresponding entry in a single agent’s observation is a boolean value (include the current agent itself).

mate.read_config(config_or_path: Dict[str, Any] | str | None = None, **kwargs) Dict[str, Any][source]

Load configuration from a dictionary mapping or a JSON/YAML file.

class mate.EnvMeta[source]

Bases: type

Helper metaclass for instance check.

class mate.MultiAgentTracking(config: Dict[str, Any] | str | None = None, **kwargs)[source]

Bases: Env, EzPickle

The main class of the Multi-Agent Tracking Environment. It encapsulates an environment with arbitrary behind-the-scenes dynamics. This environment is partially observed for both teams.

The main API methods that users of this class need to know are:

  • step

  • reset

  • render

  • close

  • seed

  • send_messages <- new method

  • receive_messages <- new method

  • load_config <- new method

And set the following attributes:

action_space: A tuple of two Space objects corresponding to valid joint actions of cameras and targets camera_action_space: The Space object corresponding to a single camera’s valid actions camera_joint_action_space: The Space object corresponding to valid joint actions of all cameras target_action_space: The Space object corresponding to a single target’s valid actions target_joint_action_space: The Space object corresponding to valid joint actions of all targets observation_space: A tuple of two Space objects corresponding to valid joint observations of cameras and targets camera_observation_space: The Space object corresponding to a single camera’s valid observations camera_joint_observation_space: The Space object corresponding to valid joint observations of all cameras target_observation_space: The Space object corresponding to a single target’s valid observations target_joint_observation_space: The Space object corresponding to valid joint observations of all targets

The methods are accessed publicly as “step”, “reset”, etc…

metadata: Dict[str, Any] = {'render.modes': ['human', 'rgb_array'], 'video.frames_per_second': 60, 'video.output_frames_per_second': 60}
DEFAULT_CONFIG_FILE = PosixPath('/home/docs/checkouts/readthedocs.org/user_builds/mate-gym/checkouts/latest/mate/assets/MATE-4v8-9.yaml')

The default configuration file.

__init__(config: Dict[str, Any] | str | None = None, **kwargs) None[source]

Initialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.

Parameters:

config (Optional[Union[Dict[str, Any], str]]) – a dictionary mapping or a path to a readable JSON/YAML file

action_space: Space[ActType]
observation_space: Space[ObsType]
load_config(config: Dict[str, Any] | str | None = None) None[source]

Reinitialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.

Parameters:

config (Optional[Union[Dict[str, Any], str]]) – a dictionary mapping or a path to a readable JSON/YAML file

Examples

You can change the environment configuration without creating a new environment, and this will keep the wrappers you add.

>>> env = mate.make('MultiAgentTracking-v0', config='MATE-4v8-9.yaml')
>>> env = mate.MultiCamera(env, target_agent=mate.GreedyTargetAgent(seed=0))
>>> print(env)
<MultiCamera<MultiAgentTracking<MultiAgentTracking-v0>>(4 cameras, 8 targets, 9 obstacles)>
>>> env.load_config('MATE-4v2-9.yaml')
>>> print(env)
<MultiCamera<MultiAgentTracking<MultiAgentTracking-v0>>(4 cameras, 2 targets, 9 obstacles)>
step(action: Tuple[ndarray, ndarray]) Tuple[Tuple[ndarray, ndarray], Tuple[float, float], bool, Tuple[List[dict], List[dict]]][source]

Run one timestep of the environment’s dynamics. When end of episode is reached, you are responsible for calling reset() to reset this environment’s state.

Accepts a tuple of cameras’ joint action and targets’ joint action, and returns a tuple (observation, reward, done, info).

Parameters:

action (Tuple[np.ndarray, np.ndarray]) – a tuple of joint actions provided by the camera agents and the target agents

Returns:

a tuple of agent’s observation of the current environment reward (Tuple[float, float]): a tuple of the amount of reward returned after previous action done (bool): whether the episode has ended, in which case further step() calls will return undefined results info (Tuple[List[dict], List[dict]]): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning)

Return type:

observation (Tuple[np.ndarray, np.ndarray])

reset(*, seed: int | None = None) Tuple[ndarray, ndarray][source]

Resets the environment to an initial state and returns an initial observation. The entities (cameras, targets and obstacles) may be shuffled if not explicitly disabled in configuration.

Note that unless an explicit seed is provided, this function would not reset the environment’s random number generator(s). Random variables in the environment’s state should be sampled independently between multiple calls to reset(). In other words, each call of reset() should yield an environment suitable for a new episode, independent of previous episodes.

Parameters:

seed (int) – the seed for the random number generator(s)

Returns:

the initial observations of all cameras and targets.

Return type:

observations (Tuple[numpy.ndarray, np.ndarray])

send_messages(messages: Message | Iterable[Message]) None[source]

Buffer the messages from an agent to others in the same team.

The environment will send the messages to recipients’ through method receive_messages(), and also info field of step() results.

receive_messages(agent_id: Tuple[Team, int] | None = None, agent: AgentType | None = None) Tuple[List[List[Message]], List[List[Message]]] | List[Message][source]

Retrieve the messages to recipients. If no agent is specified, this method will return all the messages to all agents in the environment.

The environment will also put the messages to recipients’ info field of step() results.

state() ndarray[source]

The global state of the environment.

joint_observation() Tuple[ndarray, ndarray][source]

Joint observations of both teams.

render(mode: str = 'human', window_size: int = 800, onetime_callbacks: Iterable[Callable[[MultiAgentTracking, str], None]] = ()) bool | ndarray[source]

Render the environment.

The set of supported modes varies per environment. (And some environments do not support rendering at all.) By convention, if mode is:

  • human: render to the current display or terminal and return nothing. Usually for human consumption.

  • rgb_array: Return an numpy.ndarray with shape (x, y, 3), representing RGB values for an x-by-y pixel image, suitable for turning into a video.

Parameters:
  • mode (str) – the mode to render with

  • window_size (int) – the width and height of the render window (only valid for the first call)

  • onetime_callbacks (Iterable[callable]) – callback functions for the rendering results

add_render_callback(name: str, callback: Callable[[MultiAgentTracking, str], None]) None[source]

Add a callback function to the render function.

This is useful to add additional elements to the rendering results.

close() None[source]

Perform necessary cleanup.

Environments will automatically close() themselves when garbage collected or when the program exits.

seed(seed: int | None = None) List[int][source]

Set the seed for this environment’s random number generators.

Note

Some environments use multiple pseudorandom number generators. We want to capture all such seeds used in order to ensure that there aren’t accidental correlations between multiple generators.

Returns:

the list of seeds used in this environment’s random

number generators. The first value in the list should be the “main” seed, or the value which a reproducer should pass to ‘seed’. Often, the main seed equals the provided ‘seed’, but this won’t be true if seed=None, for example.

Return type:

list<bigint>

property np_random: RandomState

The main random number generator of the environment.

route_messages(messages: List[Message]) List[Message][source]

Convert broadcast messages to peer-to-peer forms.

property name: str

Name of the environment.

property max_episode_steps: int

Maximum number of episode steps.

property camera_min_viewing_angle: float

Minimum viewing angle of cameras in degrees.

property camera_max_sight_range: float

Maximum sight range of cameras.

property camera_rotation_step: float

Maximum rotation step of cameras in degrees.

property camera_zooming_step: float

Maximum zooming step of cameras in degrees.

property target_step_size: float

Maximum step size of targets.

property target_sight_range: float

Sight range of targets.

property num_cargoes_per_target: int

Average number of cargoes per target.

property targets_start_with_cargoes: bool

Always assign cargoes to the target at the beginning of an episode.

property bounty_factor: float

The ratio of the maximum bounty reward over the freight reward.

property obstacle_transmittance: float

Transmittance coefficient of obstacles.

property shuffle_entities: bool

Whether or not to shuffle entity IDs when reset the environment.

property num_warehouses: int

Number of warehouses.

property num_cameras: int

Number of camera(s) in the environment.

property num_targets: int

Number of target(s) in the environment.

property num_obstacles: int

Number of obstacle(s) in the environment.

property high_capacity_target_split: float

Population ratio of the high-capacity target in the target team.

property num_high_capacity_targets: float

Number of high-capacity target(s) in the target team.

property num_low_capacity_targets: float

Number of low-capacity target(s) in the target team.

property camera_observation_dim: int

Dimension of single camera observation.

property target_observation_dim: int

Dimension of single target observation.

class mate.EnhancedObservation(env: BaseEnvironmentType, team: Literal['both', 'camera', 'target', 'none'] = 'both')[source]

Bases: ObservationWrapper

Enhance the agent’s observation, which sets all observation mask to True. The targets can observe the empty status of all warehouses even when far away.

load_config(config: Dict[str, Any] | str | None = None) None[source]

Reinitialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.

observation(observation: Tuple[ndarray, ndarray]) Tuple[ndarray, ndarray][source]

Returns a modified observation.

class mate.SharedFieldOfView(env: BaseEnvironmentType, team: Literal['both', 'camera', 'target', 'none'] = 'both')[source]

Bases: ObservationWrapper

Share field of view among agents in the same team, which applies the “or” operator over the observation masks. The target agents also share the empty status of warehouses.

load_config(config: Dict[str, Any] | str | None = None) None[source]

Reinitialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.

observation(observation: Tuple[ndarray, ndarray]) Tuple[ndarray, ndarray][source]

Returns a modified observation.

class mate.RescaledObservation(env: MultiAgentTracking)[source]

Bases: ObservationWrapper

Rescale all entity states in the observation to [-1., +1.]. (Not used in the evaluation script.)

observation(observation: Tuple[ndarray, ndarray] | ndarray) Tuple[ndarray, ndarray] | ndarray[source]

Returns a modified observation.

rescale_observation(observation: ndarray, team: Team) ndarray[source]
class mate.RelativeCoordinates(env: MultiAgentTracking)[source]

Bases: ObservationWrapper

Convert all locations of other entities in the observation to relative coordinates (exclude the current agent itself). (Not used in the evaluation script.)

observation(observation: Tuple[ndarray, ndarray] | ndarray) Tuple[ndarray, ndarray] | ndarray[source]

Returns a modified observation.

convert_coordinates(observation: ndarray, team: Team) ndarray[source]
class mate.MoreTrainingInformation(env: BaseEnvironmentType)[source]

Bases: Wrapper

Add more environment and agent information to the info field of step(), enabling full observability of the environment. (Not used in the evaluation script.)

step(action: Tuple[ndarray, ndarray]) Tuple[Tuple[ndarray, ndarray], Tuple[float, float], bool, Tuple[List[dict], List[dict]]] | Tuple[Tuple[ndarray, ndarray], Tuple[List[float], List[float]], Tuple[List[bool], List[bool]], Tuple[List[dict], List[dict]]][source]

Steps through the environment with action.

class mate.DiscreteCamera(env: BaseEnvironmentType, levels: int = 5)[source]

Bases: ActionWrapper

Wrap the environment to allow cameras to use discrete actions.

load_config(config: Dict[str, Any] | str | None = None) None[source]

Reinitialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.

action(action: Tuple[ndarray, ndarray]) Tuple[ndarray, ndarray][source]

Convert joint action of cameras from discrete to continuous.

reverse_action(action: Tuple[ndarray, ndarray]) Tuple[ndarray, ndarray][source]

Convert joint action of cameras from continuous to discrete.

static discrete_action_grid(levels)[source]
class mate.DiscreteTarget(env: BaseEnvironmentType, levels: int = 5)[source]

Bases: ActionWrapper

Wrap the environment to allow targets to use discrete actions.

load_config(config: Dict[str, Any] | str | None = None) None[source]

Reinitialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.

reset(**kwargs) Tuple[ndarray, ndarray][source]

Resets the environment with kwargs.

action(action: Tuple[ndarray, ndarray]) Tuple[ndarray, ndarray][source]

Convert joint action of targets from discrete to continuous.

reverse_action(action: Tuple[ndarray, ndarray]) Tuple[ndarray, ndarray][source]

Convert joint action of targets from continuous to discrete.

static discrete_action_grid(levels)[source]
class mate.AuxiliaryCameraRewards(env: BaseEnvironmentType | MultiCamera | MultiTarget, coefficients: Dict[str, float | Callable[[int, int, int, float, float], float]], reduction: Literal['mean', 'sum', 'max', 'min', 'none'] = 'none')[source]

Bases: Wrapper

Add additional auxiliary rewards for each individual camera. (Not used in the evaluation script.)

The auxiliary reward is a weighted sum of the following components:

  • raw_reward (the higher the better): team reward returned by the environment (shared, range in \((-\infty, 0]\)).

  • coverage_rate (the higher the better): coverage rate of all targets in the environment (shared, range in \([0, 1]\)).

  • real_coverage_rate (the higher the better): coverage rate of targets with cargoes in the environment (shared, range in \([0, 1]\)).

  • mean_transport_rate (the lower the better): mean transport rate of the target team (shared, range in \([0, 1]\)).

  • soft_coverage_score (the higher the better): soft coverage score is proportional to the distance from the target to the camera’s boundary (individual, range in \([-1, N_{\mathcal{T}}]\)).

  • num_tracked (the higher the better): number of targets tracked the camera (shared, range in \([0, N_{\mathcal{T}}]\)).

  • baseline: constant \(1\).

ACCEPTABLE_KEYS = ('raw_reward', 'coverage_rate', 'real_coverage_rate', 'mean_transport_rate', 'soft_coverage_score', 'num_tracked', 'baseline')
REDUCERS = {'max': <function amax>, 'mean': <function mean>, 'min': <function amin>, 'sum': <function sum>}
reset(**kwargs) ndarray[source]

Resets the environment with kwargs.

step(action: Tuple[ndarray, ndarray] | ndarray) Tuple[Tuple[ndarray, ndarray], Tuple[List[float], List[float]], Tuple[List[bool], List[bool]], Tuple[List[dict], List[dict]]] | Tuple[ndarray, List[float], List[bool], List[dict]][source]

Steps through the environment with action.

static compute_soft_coverage_scores(env) ndarray[source]

Compute all soft coverage score for each individual camera.

static compute_soft_coverage_score(camera, targets, tracked_bits: ndarray) List[float][source]

The soft coverage score is proportional to the distance from the target to the camera’s boundary.

class mate.AuxiliaryTargetRewards(env: BaseEnvironmentType | MultiCamera | MultiTarget, coefficients: Dict[str, float | Callable[[int, int, int, float, float], float]], reduction: Literal['mean', 'sum', 'max', 'min', 'none'] = 'none')[source]

Bases: Wrapper

Add additional auxiliary rewards for each individual target. (Not used in the evaluation script.)

The auxiliary reward is a weighted sum of the following components:

  • raw_reward (the higher the better): team reward returned by the environment (shared, range in \([0, +\infty)\)).

  • coverage_rate (the lower the better): coverage rate of all targets in the environment (shared, range in \([0, 1]\)).

  • real_coverage_rate (the lower the better): coverage rate of targets with cargoes in the environment (shared, range in \([0, 1]\)).

  • mean_transport_rate (the higher the better): mean transport rate of the target team (shared, range in \([0, 1]\)).

  • normalized_goal_distance (the lower the better): the normalized value of the distance to destination, or the nearest non-empty warehouse when the target is not loaded (individual, range in \([0, \sqrt{2}]\)).

  • sparse_delivery (the higher the better): a boolean value that indicates whether the target reaches the destination (individual, range in \({0, 1}\)).

  • soft_coverage_score (the lower the better): soft coverage score is proportional to the distance from the target to the camera’s boundary (individual, range in \([-1, N_{\mathcal{C}}]\)).

  • is_tracked (the lower the better): a boolean value that indicates whether the target is tracked by any camera or not. (individual, range in \({0, 1}\)).

  • is_colliding (the lower the better): a boolean value that indicates whether the target is colliding with obstacles, cameras’ barriers of terrain boundary. (individual, range in \({0, 1}\)).

  • baseline: constant \(1\).

ACCEPTABLE_KEYS = ('raw_reward', 'coverage_rate', 'real_coverage_rate', 'mean_transport_rate', 'normalized_goal_distance', 'sparse_delivery', 'soft_coverage_score', 'is_tracked', 'is_colliding', 'baseline')
REDUCERS = {'max': <function amax>, 'mean': <function mean>, 'min': <function amin>, 'sum': <function sum>}
reset(**kwargs) ndarray[source]

Resets the environment with kwargs.

step(action: Tuple[ndarray, ndarray] | ndarray) Tuple[Tuple[ndarray, ndarray], Tuple[List[float], List[float]], Tuple[List[bool], List[bool]], Tuple[List[dict], List[dict]]] | Tuple[ndarray, List[float], List[bool], List[dict]][source]

Steps through the environment with action.

mate.group_reset(agents: Iterable[AgentType], joint_observation: ndarray | Iterable[ndarray]) None[source]

Reset a group of agents.

mate.group_step(env: BaseEnvironmentType, agents: Iterable[AgentType], joint_observation: ndarray | Iterable[ndarray], infos: List[dict] | None = None, deterministic: bool | None = None) List[int | ndarray][source]

Helper function to do a environment step for a group of agents.

mate.group_observe(agents: Iterable[AgentType], joint_observation: ndarray | Iterable[ndarray], infos: List[dict] | None = None) List[int | ndarray][source]

Set the observation for a group of agents.

mate.group_communicate(env: BaseEnvironmentType, agents: Iterable[AgentType]) None[source]

Send and receive messages from a group of agents to the environment.

mate.group_act(agents: Iterable[AgentType], joint_observation: ndarray | Iterable[ndarray], infos: List[dict] | None = None, deterministic: bool | None = None) List[int | ndarray][source]

Get the joint action of a group of agents.

class mate.MultiCamera(env: BaseEnvironmentType, target_agent: TargetAgentBase)[source]

Bases: SingleTeamMultiAgent

Wrap the environment into a single-team multi-agent environment that users can use the Gym API to train and/or evaluate their camera agents.

class mate.SingleCamera(env: BaseEnvironmentType, other_camera_agent: CameraAgentBase, target_agent: TargetAgentBase)[source]

Bases: SingleTeamSingleAgent

Wrap the environment to a single-team single-agent environment that users can use the Gym API to train and/or evaluate their camera agent.

class mate.MultiTarget(env: BaseEnvironmentType, camera_agent: CameraAgentBase)[source]

Bases: SingleTeamMultiAgent

Wraps the environment into a single-team multi-agent environment that users can use the Gym API to train and/or evaluate their target agents.

class mate.SingleTarget(env: BaseEnvironmentType, other_target_agent: TargetAgentBase, camera_agent: CameraAgentBase)[source]

Bases: SingleTeamSingleAgent

Wrap the environment to a single-team single-agent environment that users can use the Gym API to train and/or evaluate their target agent.

class mate.MessageFilter(env: MultiAgentTracking, filter: Callable[[MultiAgentTracking, Message], bool])[source]

Bases: Wrapper

Filter messages from agents of intra-team communications. (Not used in the evaluation script.)

Users can use this wrapper to implement a communication channel with limited bandwidth, limited communication range, or random dropout. This wrapper can be applied multiple times with different filter functions.

Note

The filter function can also modify the message content. Users can use this to add channel signal noises etc.

send_messages(messages: Message | Iterable[Message]) None[source]

Buffer the messages from an agent to others in the same team.

The environment will send the messages to recipients’ through method receive_messages(), and also info field of step() results.

class mate.RestrictedCommunicationRange(env: MultiAgentTracking, range_limit: float)[source]

Bases: MessageFilter

Add a restricted communication range to channels. (Not used in the evaluation script.)

static filter(env: MultiAgentTracking, message: Message, range_limit: float) bool[source]

Filter out messages beyond range limit.

class mate.RandomMessageDropout(env: MultiAgentTracking, dropout_rate: float)[source]

Bases: MessageFilter

Randomly drop messages in communication channels. (Not used in the evaluation script.)

static filter(env: MultiAgentTracking, message: Message, dropout_rate: float) bool[source]

Randomly drop messages.

class mate.NoCommunication(env: MultiAgentTracking, team: Literal['both', 'camera', 'target', 'none'] = 'both')[source]

Bases: MessageFilter

Disable intra-team communications, i.e., filter out all messages.

class mate.ExtraCommunicationDelays(env: MultiAgentTracking, delay: int | Callable[[MultiAgentTracking, Message], int] = 3)[source]

Bases: Wrapper

Add extra message delays to communication channels. (Not used in the evaluation script.)

Users can use this wrapper to implement a communication channel with random delays.

reset(**kwargs) Tuple[ndarray, ndarray] | ndarray[source]

Resets the environment with kwargs.

send_messages(messages: Message | Iterable[Message]) None[source]

Buffer the messages from an agent to others in the same team.

The environment will send the messages to recipients’ through method receive_messages(), and also info field of step() results.

class mate.RenderCommunication(env: MultiAgentTracking, duration: int | None = 20)[source]

Bases: Wrapper

Draw arrows for intra-team communications in rendering results.

load_config(config: Dict[str, Any] | str | None = None) None[source]

Reinitialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.

reset(**kwargs) Tuple[ndarray, ndarray][source]

Resets the environment with kwargs.

step(action: Tuple[ndarray, ndarray]) Tuple[Tuple[ndarray, ndarray], Tuple[float, float], bool, Tuple[List[dict], List[dict]]][source]

Steps through the environment with action.

callback(unwrapped: MultiAgentTracking, mode: str) None[source]

Draw communication messages as arrows.

class mate.RepeatedRewardIndividualDone(env: BaseEnvironmentType | MultiCamera | MultiTarget, target_done_at_destination=False)[source]

Bases: Wrapper

Repeat the reward field and assign individual done field of step(), which is similar to the OpenAI Multi-Agent Particle Environment. (Not used in the evaluation script.)

step(action: Tuple[ndarray, ndarray] | ndarray) Tuple[Tuple[ndarray, ndarray], Tuple[List[float], List[float]], Tuple[List[bool], List[bool]], Tuple[List[dict], List[dict]]] | Tuple[ndarray, List[float], List[bool], List[dict]][source]

Steps through the environment with action.

mate.WrapperMeta

alias of EnvMeta

class mate.WrapperSpec(wrapper, *args, **kwargs)[source]

Bases: object

Helper class for creating environments with wrappers.

class mate.CameraAgentBase(seed: int | None = None)[source]

Bases: AgentBase

Base class for camera agents.

TEAM: Team = 0
DEFAULT_ACTION: int | ndarray = array([0., 0.])
STATE_CLASS

alias of CameraStatePrivate

TEAMMATE_STATE_CLASS

alias of CameraStatePublic

OPPONENT_STATE_CLASS

alias of TargetStatePublic

property num_teammates: int

Number of agents in the same team, including the current agent.

property num_opponents: int

Number of adversarial agents in the opponent team.

class mate.TargetAgentBase(seed: int | None = None)[source]

Bases: AgentBase

Base class for target agents.

TEAM: Team = 1
DEFAULT_ACTION: int | ndarray = array([0., 0.])
STATE_CLASS

alias of TargetStatePrivate

TEAMMATE_STATE_CLASS

alias of TargetStatePublic

OPPONENT_STATE_CLASS

alias of CameraStatePublic

property num_teammates: int

Number of agents in the same team, including the current agent.

property num_opponents: int

Number of adversarial agents in the opponent team.

class mate.RandomCameraAgent(seed=None, frame_skip=20)[source]

Bases: CameraAgentBase

Random Camera Agent

Random action.

__init__(seed=None, frame_skip=20)[source]

Initialize the agent. This function will be called only once on initialization.

Note

Agents can obtain the number of teammates and opponents on reset, but not here. You are responsible for writing scalable policies and code to handle this.

reset(observation)[source]

Reset the agent. This function will be called immediately after env.reset().

Note

observation is a 1D array, not a 2D array with an additional dimension for agent indices.

act(observation, info=None, deterministic=None)[source]

Get the agent action by the observation. This function will be called before every env.step().

Random action.

class mate.RandomTargetAgent(seed=None, frame_skip=20)[source]

Bases: TargetAgentBase

Random Target Agent

Random action.

__init__(seed=None, frame_skip=20)[source]

Initialize the agent. This function will be called only once on initialization.

Note

Agents can obtain the number of teammates and opponents on reset, but not here. You are responsible for writing scalable policies and code to handle this.

reset(observation)[source]

Reset the agent. This function will be called immediately after env.reset().

Note

observation is a 1D array, not a 2D array with an additional dimension for agent indices.

act(observation, info=None, deterministic=None)[source]

Get the agent action by the observation. This function will be called before every env.step().

Random action.

class mate.NaiveCameraAgent(seed: int | None = None)[source]

Bases: CameraAgentBase

Naive Camera Agent

Rotates anti-clockwise with the maximum viewing angle.

act(observation, info=None, deterministic=None)[source]

Get the agent action by the observation. This function will be called before every env.step().

Rotate anti-clockwise with the maximum viewing angle.

class mate.NaiveTargetAgent(seed=None)[source]

Bases: TargetAgentBase

Naive Target Agent

Visits all warehouses in turn.

__init__(seed=None)[source]

Initialize the agent. This function will be called only once on initialization.

property goal_location

Location of the current warehouse.

reset(observation)[source]

Reset the agent. This function will be called immediately after env.reset().

act(observation, info=None, deterministic=None)[source]

Get the agent action by the observation. This function will be called before every env.step().

Visit all warehouses in turn.

class mate.GreedyCameraAgent(seed=None, memory_period=25, filterout_unloaded=False, filterout_beyond_range=True)[source]

Bases: CameraAgentBase

Greedy Camera Agent

Arbitrarily tracks the nearest target. If no target found, use previous action or generate a new random action.

__init__(seed=None, memory_period=25, filterout_unloaded=False, filterout_beyond_range=True)[source]

Initialize the agent. This function will be called only once on initialization.

reset(observation)[source]

Reset the agent. This function will be called immediately after env.reset().

observe(observation, info=None)[source]

The agent observe the environment before sending messages. This function will be called before send_responses().

act(observation, info=None, deterministic=None)[source]

Get the agent action by the observation. This function will be called before every env.step().

Arbitrarily track the nearest target. If no target found, use previous action or generate a new random action.

process_messages(observation, messages)[source]

Process observation and prepare messages to teammates.

act_from_target_states(target_states)[source]

Place the selected target at the center of the field of view.

send_responses()[source]

Prepare messages to communicate with other agents in the same team. This function will be called before receive_responses().

Send the newest target states to teammates if necessary.

receive_responses(messages)[source]

Receive messages from other agents in the same team. This function will be called after send_responses() but before act().

Receive and process messages from teammates.

class mate.GreedyTargetAgent(seed=None, noise_scale=0.5)[source]

Bases: TargetAgentBase

Greedy Target Agent

Arbitrarily runs towards the destination (desired warehouse) with some noise.

__init__(seed=None, noise_scale=0.5)[source]

Initialize the agent. This function will be called only once on initialization.

property goal

Index of the current warehouse.

property goal_location

Location of the current warehouse.

reset(observation)[source]

Reset the agent. This function will be called immediately after env.reset().

observe(observation, info=None)[source]

The agent observe the environment before sending messages. This function will be called before send_responses().

act(observation, info=None, deterministic=None)[source]

Get the agent action by the observation. This function will be called before every env.step().

Arbitrarily run towards the warehouse with some noise.

process_messages(observation, messages)[source]

Process observation and prepare messages to teammates.

send_responses()[source]

Prepare messages to communicate with other agents in the same team. This function will be called before receive_responses().

Send indices of non-empty warehouses to teammate if necessary.

receive_responses(messages)[source]

Receive messages from other agents in the same team. This function will be called after send_responses() but before act().

Receive and process messages from teammates.

class mate.HeuristicCameraAgent(seed=None)[source]

Bases: CameraAgentBase

Heuristic Camera Agent

Greedily maximizes the heuristic scores as much as possible. All agents send their observations to the centralized controller (agent 0). Then the central controller sends the goal state (camera pose) to each agent.

__init__(seed=None)[source]

Initialize the agent. This function will be called only once on initialization.

reset(observation)[source]

Reset the agent. This function will be called immediately after env.reset().

act(observation, info=None, deterministic=None)[source]

Get the agent action by the observation. This function will be called before every env.step().

Arbitrarily track the nearest target. If no target found, use previous action or generate a new random action.

send_requests()[source]

Prepare messages to communicate with other agents in the same team. This function will be called after observe() but before receive_requests().

receive_requests(messages)[source]

Receive messages from other agents in the same team. This function will be called after send_requests().

send_responses()[source]

Prepare messages to communicate with other agents in the same team. This function will be called after receive_requests().

receive_responses(messages)[source]

Receive messages from other agents in the same team. This function will be called after send_responses() but before act().

get_joint_goal_state(camera_states, target_states)[source]

Greedily track the targets as many as possible.

static calculate_scores(max_sight_range, min_viewing_angle)[source]

Calculates the coordinate grid and the weighted scores.

class mate.HeuristicTargetAgent(seed=None, noise_scale=0.5)[source]

Bases: GreedyTargetAgent

Heuristic Target Agent

Arbitrarily runs towards the destination (desired warehouse) with some noise. In addition to the greedy target agent, a drift speed is added, which escapes away from the center of cameras’ field of view.

act(observation, info=None, deterministic=None)[source]

Get the agent action by the observation. This function will be called before every env.step().

Arbitrarily run towards the warehouse with some noise.

class mate.MixtureCameraAgent(candidates, weights=None, mixture_seed=None, seed=None)[source]

Bases: MixtureAgentMixIn, CameraAgentBase

Mixture Camera Agent

Randomly choose a underlying camera agent in candidates at episode start.

class mate.MixtureTargetAgent(candidates, weights=None, mixture_seed=None, seed=None)[source]

Bases: MixtureAgentMixIn, TargetAgentBase

Mixture Target Agent

Randomly choose a underlying target agent in candidates at episode start.

mate.convert_coordinates(observation: ndarray, team: Team, num_cameras: int, num_targets: int, num_obstacles: int) ndarray[source]

Convert all locations of other entities in the observation to relative coordinates (exclude the current agent itself).

mate.normalize_observation(observation: ndarray, observation_space: Box, additional_mask: ndarray | None = None) ndarray[source]

Rescale all entity states in the observation to [-1., +1.].

mate.rescale_observation(observation: ndarray, team: Team, num_cameras: int, num_targets: int, num_obstacles: int) ndarray[source]

Rescale all entity states in the observation to [-1., +1.].

mate.split_observation(observation: ndarray, team: Team, num_cameras: int, num_targets: int, num_obstacles: int) Tuple[ndarray, ndarray, ndarray, ndarray, ndarray][source]

Splits the concatenated observations into parts.

class mate.CameraStatePublic(state: ndarray, index: int)[source]

Bases: StateBase

DIM = 6
property radius: float | ndarray
property sight_range: float | ndarray
property orientation: float | ndarray
property viewing_angle: float | ndarray
class mate.CameraStatePrivate(state: ndarray, index: int)[source]

Bases: CameraStatePublic

DIM = 9
property max_sight_range: float | ndarray
property min_viewing_angle: float | ndarray
property rotation_step: float | ndarray
property zooming_step: float | ndarray
property action_space: Box
class mate.TargetStatePublic(state: ndarray, index: int)[source]

Bases: StateBase

DIM = 4
property sight_range: float | ndarray
property is_loaded: bool | ndarray
class mate.TargetStatePrivate(state: ndarray, index: int)[source]

Bases: StateBase

DIM = 14
property step_size: float | ndarray
property capacity: float | ndarray
property goal_bits: ndarray
property empty_bits: ndarray
property action_space: Box
class mate.ObstacleState(state: ndarray, index: int)[source]

Bases: StateBase

DIM = 3
property radius: float | ndarray
mate.seed_everything(seed: int) None[source]

Set the seed for global random number generators.

mate.sin_deg(x)[source]

Trigonometric sine in degrees, element-wise.

\[\sin_{\text{deg}} ( x ) = \sin \left( \frac{\pi}{180} x \right)\]
mate.cos_deg(x)[source]

Trigonometric cosine in degrees, element-wise.

\[\cos_{\text{deg}} ( x ) = \cos \left( \frac{\pi}{180} x \right)\]
mate.tan_deg(x)[source]

Trigonometric tangent in degrees, element-wise.

\[\tan_{\text{deg}} ( x ) = \tan \left( \frac{\pi}{180} x \right)\]
mate.arcsin_deg(x)[source]

Trigonometric inverse sine in degrees, element-wise.

\[\arcsin_{\text{deg}} ( x ) = \frac{180}{\pi} \arcsin ( x )\]
mate.arccos_deg(x)[source]

Trigonometric inverse cosine in degrees, element-wise.

\[\arccos_{\text{deg}} ( x ) = \frac{180}{\pi} \arcsin ( x )\]
mate.arctan2_deg(y, x)[source]

Element-wise arc tangent of y/x in degrees.

\[\operatorname{arctan2}_{\text{deg}} ( y, x ) = \frac{180}{\pi} \arctan \left( \frac{y}{x} \right)\]
mate.cartesian2polar(x, y)[source]

Convert cartesian coordinates to polar coordinates in degrees, element-wise.

\[\operatorname{cartesian2polar} ( x, y ) = \left( \sqrt{x^2 + y^2}, \operatorname{arctan2}_{\text{deg}} ( y, x ) \right)\]
mate.polar2cartesian(rho, phi)[source]

Convert polar coordinates to cartesian coordinates in degrees, element-wise.

\[\operatorname{polar2cartesian} ( \rho, \phi ) = \left( \rho \cos_{\text{deg}} ( \phi ), \rho \sin_{\text{deg}} ( \phi ) \right)\]
mate.normalize_angle(angle)[source]

Normalize a angle in degree to \([-180, +180)\).

class mate.Vector2D(vector=None, norm=None, angle=None, origin=None)[source]

Bases: object

2D Vector.

property vector
property x
property y
property endpoint
property angle
property norm
copy()[source]
class mate.Team(value)[source]

Bases: Enum

Enumeration of teams.

CAMERA = 0
TARGET = 1
class mate.Message(sender: int, recipient: int | None, content: Any, team: Team, broadcasting: bool = False)[source]

Bases: object

Message class for communication between agents in the same team.

sender: int

Agent index of the sender.

recipient: int | None

Agent index of the recipient, leave None for broadcasting.

content: Any

Message content.

team: Team

String to indicate the team of agents.

broadcasting: bool = False

Whether or not to broadcast to all teammates.