mate package
Subpackages
- mate.agents package
- Submodules
- mate.agents.base module
AgentBase
AgentBase.TEAM
AgentBase.DEFAULT_ACTION
AgentBase.STATE_CLASS
AgentBase.TEAMMATE_STATE_CLASS
AgentBase.OPPONENT_STATE_CLASS
AgentBase.__init__()
AgentBase.action_space
AgentBase.observation_space
AgentBase.num_teammates
AgentBase.num_opponents
AgentBase.num_adversaries
AgentBase.clone()
AgentBase.spawn()
AgentBase.np_random
AgentBase.seed()
AgentBase.reset()
AgentBase.observe()
AgentBase.act()
AgentBase.predict()
AgentBase.__call__()
AgentBase.send_requests()
AgentBase.receive_requests()
AgentBase.send_responses()
AgentBase.receive_responses()
AgentBase.check_inputs()
AgentBase.pack_message()
AgentBase.get_teammate_state()
AgentBase.get_teammate_states()
AgentBase.get_opponent_state()
AgentBase.get_all_opponent_states()
AgentBase.get_obstacle_state()
AgentBase.get_all_obstacle_states()
CameraAgentBase
TargetAgentBase
- mate.agents.random module
- mate.agents.naive module
- mate.agents.greedy module
- mate.agents.heuristic module
HeuristicCameraAgent
HeuristicCameraAgent.__init__()
HeuristicCameraAgent.reset()
HeuristicCameraAgent.act()
HeuristicCameraAgent.send_requests()
HeuristicCameraAgent.receive_requests()
HeuristicCameraAgent.send_responses()
HeuristicCameraAgent.receive_responses()
HeuristicCameraAgent.get_joint_goal_state()
HeuristicCameraAgent.calculate_scores()
HeuristicTargetAgent
- mate.agents.mixture module
- mate.agents.utils module
- Module contents
CameraAgentBase
TargetAgentBase
RandomCameraAgent
RandomTargetAgent
NaiveCameraAgent
NaiveTargetAgent
GreedyCameraAgent
GreedyTargetAgent
HeuristicCameraAgent
HeuristicCameraAgent.__init__()
HeuristicCameraAgent.reset()
HeuristicCameraAgent.act()
HeuristicCameraAgent.send_requests()
HeuristicCameraAgent.receive_requests()
HeuristicCameraAgent.send_responses()
HeuristicCameraAgent.receive_responses()
HeuristicCameraAgent.get_joint_goal_state()
HeuristicCameraAgent.calculate_scores()
HeuristicTargetAgent
MixtureCameraAgent
MixtureTargetAgent
convert_coordinates()
normalize_observation()
rescale_observation()
split_observation()
CameraStatePublic
CameraStatePrivate
TargetStatePublic
TargetStatePrivate
ObstacleState
- mate.assets package
- mate.wrappers package
Submodules
mate.constants module
Constants of the Multi-Agent Tracking Environment.
- mate.constants.TERRAIN_SIZE = 1000.0
Terrain size. The terrain is a 2000.0 by 2000.0 square.
- mate.constants.TERRAIN_WIDTH = 2000.0
Terrain width. The terrain is a 2000.0 by 2000.0 square.
- mate.constants.TERRAIN_SPACE = Box(-1000.0, 1000.0, (2,), float64)
The space object of the terrain. The terrain is a 2000.0 by 2000.0 square. (i.e., a square within range of \([-1000, +1000] \times [-1000, +1000]\) in cartesian coordinates.)
- mate.constants.WAREHOUSES = array([[ 925., 925.], [-925., 925.], [-925., -925.], [ 925., -925.]])
Center locations of the warehouses.
- mate.constants.NUM_WAREHOUSES = 4
Number of warehouses.
- mate.constants.WAREHOUSE_RADIUS = 75.0
Half width of the squared warehouses.
- mate.constants.MAX_CAMERA_VIEWING_ANGLE = 180.0
Maximum viewing angle of cameras in degrees.
- mate.constants.TARGET_RADIUS = 0.0
Radius of targets.
- mate.constants.PRESERVED_SPACE = Box([ 0. 0. 0. 0. -2000. -2000. -2000. -2000. -2000. -2000. -2000. -2000. 0.], [ inf inf inf inf 2000. 2000. 2000. 2000. 2000. 2000. 2000. 2000. 1000.], (13,), float64)
The space object of agent’s preserved data.
- mate.constants.PRESERVED_DIM = 13
Preserved observation dimension, which holds the number of entities in the environment and the index of current agent.
- mate.constants.OBSERVATION_OFFSET = 13
Preserved observation dimension, which holds the number of entities in the environment and the index of current agent.
- mate.constants.CAMERA_STATE_DIM_PUBLIC = 6
Dimension of camera’s public state.
- mate.constants.CAMERA_STATE_SPACE_PUBLIC = Box([-2000. -2000. 0. -2000. -2000. 0.], [2000. 2000. 1000. 2000. 2000. 180.], (6,), float64)
The space object of camera’s public state.
- mate.constants.CAMERA_STATE_DIM_PRIVATE = 9
Dimension of camera’s private state.
- mate.constants.CAMERA_STATE_SPACE_PRIVATE = Box([-2000. -2000. 0. -2000. -2000. 0. 0. 0. 0.], [2000. 2000. 1000. 2000. 2000. 180. 2000. 180. 180.], (9,), float64)
The space object of camera’s private state.
- mate.constants.TARGET_STATE_DIM_PUBLIC = 4
Dimension of target’s public state.
- mate.constants.TARGET_STATE_SPACE_PUBLIC = Box([-2.e+03 -2.e+03 0.e+00 -1.e+00], [2.e+03 2.e+03 2.e+03 1.e+00], (4,), float64)
The space object of target’s public state.
- mate.constants.TARGET_STATE_DIM_PRIVATE = 14
Dimension of target’s private state.
- mate.constants.TARGET_STATE_SPACE_PRIVATE = Box([-2.e+03 -2.e+03 0.e+00 -1.e+00 0.e+00 1.e+00 0.e+00 0.e+00 0.e+00 0.e+00 -1.e+00 -1.e+00 -1.e+00 -1.e+00], [2.e+03 2.e+03 2.e+03 1.e+00 2.e+03 2.e+00 inf inf inf inf 1.e+00 1.e+00 1.e+00 1.e+00], (14,), float64)
The space object of target’s private state.
- mate.constants.OBSTACLE_STATE_DIM = 3
Dimension of obstacle’s state.
- mate.constants.OBSTACLE_STATE_SPACE = Box([-2000. -2000. 0.], [2000. 2000. 1000.], (3,), float64)
The space object of obstacle’s state.
- mate.constants.CAMERA_ACTION_DIM = 2
Dimension of camera’s action.
- mate.constants.CAMERA_DEFAULT_ACTION = array([0., 0.])
Default action of cameras.
- mate.constants.TARGET_ACTION_DIM = 2
Dimension of target’s action.
- mate.constants.TARGET_DEFAULT_ACTION = array([0., 0.])
Default action of targets.
- mate.constants.camera_observation_space_of(num_cameras: int, num_targets: int, num_obstacles: int) Box [source]
Get the space object of a single camera’s observation from the given number of entities.
- mate.constants.target_observation_space_of(num_cameras: int, num_targets: int, num_obstacles: int) Box [source]
Get the space object of a single target’s observation from the given number of entities.
- mate.constants.observation_space_of(team: Team, num_cameras: int, num_targets: int, num_obstacles: int) Box [source]
Get the space object of a single agent’s observation of the given team from the given number of entities.
- mate.constants.camera_observation_indices_of(num_cameras: int, num_targets: int, num_obstacles: int) ndarray [source]
The start indices of each part of the camera observation.
- mate.constants.target_observation_indices_of(num_cameras: int, num_targets: int, num_obstacles: int) ndarray [source]
The start indices of each part of the target observation.
- mate.constants.observation_indices_of(team: Team, num_cameras: int, num_targets: int, num_obstacles: int) ndarray [source]
The start indices of each part of the observation.
- mate.constants.camera_observation_slices_of(num_cameras: int, num_targets: int, num_obstacles: int) Dict[str, slice] [source]
The slices of each part of the camera observation.
- mate.constants.target_observation_slices_of(num_cameras: int, num_targets: int, num_obstacles: int) Dict[str, slice] [source]
The slices of each part of the target observation.
- mate.constants.observation_slices_of(team: Team, num_cameras: int, num_targets: int, num_obstacles: int) ndarray [source]
The slices of each part of the observation.
- mate.constants.camera_coordinate_mask_of(num_cameras: int, num_targets: int, num_obstacles: int) ndarray [source]
Get the bit mask from the given number of entities. The bit values is true if the corresponding entry in a single target’s observation is a coordinate value (exclude the current target itself).
- mate.constants.target_coordinate_mask_of(num_cameras: int, num_targets: int, num_obstacles: int) ndarray [source]
Get the bit mask from the given number of entities. The bit values is true if the corresponding entry in a single target’s observation is a coordinate value (exclude the current target itself).
- mate.constants.coordinate_mask_of(team: Team, num_cameras: int, num_targets: int, num_obstacles: int) ndarray [source]
Get the bit mask from the given number of entities. The bit values is true if the corresponding entry in a single agent’s observation is a boolean value (include the current agent itself).
mate.environment module
The Multi-Agent Tracking Environment.
- mate.environment.ASSETS_DIR = PosixPath('/home/docs/checkouts/readthedocs.org/user_builds/mate-gym/checkouts/latest/mate/assets')
The asset directory path.
- mate.environment.DEFAULT_CONFIG_FILE = PosixPath('/home/docs/checkouts/readthedocs.org/user_builds/mate-gym/checkouts/latest/mate/assets/MATE-4v8-9.yaml')
The default configuration file.
- mate.environment.read_config(config_or_path: Dict[str, Any] | str | None = None, **kwargs) Dict[str, Any] [source]
Load configuration from a dictionary mapping or a JSON/YAML file.
- class mate.environment.MultiAgentTracking(config: Dict[str, Any] | str | None = None, **kwargs)[source]
Bases:
Env
,EzPickle
The main class of the Multi-Agent Tracking Environment. It encapsulates an environment with arbitrary behind-the-scenes dynamics. This environment is partially observed for both teams.
The main API methods that users of this class need to know are:
step
reset
render
close
seed
send_messages <- new method
receive_messages <- new method
load_config <- new method
And set the following attributes:
action_space: A tuple of two Space objects corresponding to valid joint actions of cameras and targets camera_action_space: The Space object corresponding to a single camera’s valid actions camera_joint_action_space: The Space object corresponding to valid joint actions of all cameras target_action_space: The Space object corresponding to a single target’s valid actions target_joint_action_space: The Space object corresponding to valid joint actions of all targets observation_space: A tuple of two Space objects corresponding to valid joint observations of cameras and targets camera_observation_space: The Space object corresponding to a single camera’s valid observations camera_joint_observation_space: The Space object corresponding to valid joint observations of all cameras target_observation_space: The Space object corresponding to a single target’s valid observations target_joint_observation_space: The Space object corresponding to valid joint observations of all targets
The methods are accessed publicly as “step”, “reset”, etc…
- metadata: Dict[str, Any] = {'render.modes': ['human', 'rgb_array'], 'video.frames_per_second': 60, 'video.output_frames_per_second': 60}
- DEFAULT_CONFIG_FILE = PosixPath('/home/docs/checkouts/readthedocs.org/user_builds/mate-gym/checkouts/latest/mate/assets/MATE-4v8-9.yaml')
The default configuration file.
- __init__(config: Dict[str, Any] | str | None = None, **kwargs) None [source]
Initialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.
- action_space: Space[ActType]
- observation_space: Space[ObsType]
- load_config(config: Dict[str, Any] | str | None = None) None [source]
Reinitialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.
- Parameters:
config (Optional[Union[Dict[str, Any], str]]) – a dictionary mapping or a path to a readable JSON/YAML file
Examples
You can change the environment configuration without creating a new environment, and this will keep the wrappers you add.
>>> env = mate.make('MultiAgentTracking-v0', config='MATE-4v8-9.yaml') >>> env = mate.MultiCamera(env, target_agent=mate.GreedyTargetAgent(seed=0)) >>> print(env) <MultiCamera<MultiAgentTracking<MultiAgentTracking-v0>>(4 cameras, 8 targets, 9 obstacles)> >>> env.load_config('MATE-4v2-9.yaml') >>> print(env) <MultiCamera<MultiAgentTracking<MultiAgentTracking-v0>>(4 cameras, 2 targets, 9 obstacles)>
- step(action: Tuple[ndarray, ndarray]) Tuple[Tuple[ndarray, ndarray], Tuple[float, float], bool, Tuple[List[dict], List[dict]]] [source]
Run one timestep of the environment’s dynamics. When end of episode is reached, you are responsible for calling reset() to reset this environment’s state.
Accepts a tuple of cameras’ joint action and targets’ joint action, and returns a tuple (observation, reward, done, info).
- Parameters:
action (Tuple[np.ndarray, np.ndarray]) – a tuple of joint actions provided by the camera agents and the target agents
- Returns:
a tuple of agent’s observation of the current environment reward (Tuple[float, float]): a tuple of the amount of reward returned after previous action done (bool): whether the episode has ended, in which case further step() calls will return undefined results info (Tuple[List[dict], List[dict]]): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning)
- Return type:
observation (Tuple[np.ndarray, np.ndarray])
- reset(*, seed: int | None = None) Tuple[ndarray, ndarray] [source]
Resets the environment to an initial state and returns an initial observation. The entities (cameras, targets and obstacles) may be shuffled if not explicitly disabled in configuration.
Note that unless an explicit seed is provided, this function would not reset the environment’s random number generator(s). Random variables in the environment’s state should be sampled independently between multiple calls to reset(). In other words, each call of reset() should yield an environment suitable for a new episode, independent of previous episodes.
- Parameters:
seed (int) – the seed for the random number generator(s)
- Returns:
the initial observations of all cameras and targets.
- Return type:
observations (Tuple[numpy.ndarray, np.ndarray])
- send_messages(messages: Message | Iterable[Message]) None [source]
Buffer the messages from an agent to others in the same team.
The environment will send the messages to recipients’ through method receive_messages(), and also info field of step() results.
- receive_messages(agent_id: Tuple[Team, int] | None = None, agent: AgentType | None = None) Tuple[List[List[Message]], List[List[Message]]] | List[Message] [source]
Retrieve the messages to recipients. If no agent is specified, this method will return all the messages to all agents in the environment.
The environment will also put the messages to recipients’ info field of step() results.
- render(mode: str = 'human', window_size: int = 800, onetime_callbacks: Iterable[Callable[[MultiAgentTracking, str], None]] = ()) bool | ndarray [source]
Render the environment.
The set of supported modes varies per environment. (And some environments do not support rendering at all.) By convention, if mode is:
human: render to the current display or terminal and return nothing. Usually for human consumption.
rgb_array: Return an numpy.ndarray with shape (x, y, 3), representing RGB values for an x-by-y pixel image, suitable for turning into a video.
- add_render_callback(name: str, callback: Callable[[MultiAgentTracking, str], None]) None [source]
Add a callback function to the render function.
This is useful to add additional elements to the rendering results.
- close() None [source]
Perform necessary cleanup.
Environments will automatically close() themselves when garbage collected or when the program exits.
- seed(seed: int | None = None) List[int] [source]
Set the seed for this environment’s random number generators.
Note
Some environments use multiple pseudorandom number generators. We want to capture all such seeds used in order to ensure that there aren’t accidental correlations between multiple generators.
- Returns:
- the list of seeds used in this environment’s random
number generators. The first value in the list should be the “main” seed, or the value which a reproducer should pass to ‘seed’. Often, the main seed equals the provided ‘seed’, but this won’t be true if seed=None, for example.
- Return type:
list<bigint>
- property np_random: RandomState
The main random number generator of the environment.
- route_messages(messages: List[Message]) List[Message] [source]
Convert broadcast messages to peer-to-peer forms.
- property targets_start_with_cargoes: bool
Always assign cargoes to the target at the beginning of an episode.
mate.utils module
Utility functions for the Multi-Agent Tracking Environment.
- mate.utils.seed_everything(seed: int) None [source]
Set the seed for global random number generators.
- mate.utils.RAD2DEG = 57.29577951308232
Coefficient that converts the radian number to the equivalent number in degrees. The actual value is \(\frac{180}{\pi}\).
- mate.utils.DEG2RAD = 0.017453292519943295
Coefficient that converts the number degrees number to the radian equivalent. The actual value is \(\frac{\pi}{180}\).
- mate.utils.sin_deg(x)[source]
Trigonometric sine in degrees, element-wise.
\[\sin_{\text{deg}} ( x ) = \sin \left( \frac{\pi}{180} x \right)\]
- mate.utils.cos_deg(x)[source]
Trigonometric cosine in degrees, element-wise.
\[\cos_{\text{deg}} ( x ) = \cos \left( \frac{\pi}{180} x \right)\]
- mate.utils.tan_deg(x)[source]
Trigonometric tangent in degrees, element-wise.
\[\tan_{\text{deg}} ( x ) = \tan \left( \frac{\pi}{180} x \right)\]
- mate.utils.arcsin_deg(x)[source]
Trigonometric inverse sine in degrees, element-wise.
\[\arcsin_{\text{deg}} ( x ) = \frac{180}{\pi} \arcsin ( x )\]
- mate.utils.arccos_deg(x)[source]
Trigonometric inverse cosine in degrees, element-wise.
\[\arccos_{\text{deg}} ( x ) = \frac{180}{\pi} \arcsin ( x )\]
- mate.utils.arctan2_deg(y, x)[source]
Element-wise arc tangent of y/x in degrees.
\[\operatorname{arctan2}_{\text{deg}} ( y, x ) = \frac{180}{\pi} \arctan \left( \frac{y}{x} \right)\]
- mate.utils.cartesian2polar(x, y)[source]
Convert cartesian coordinates to polar coordinates in degrees, element-wise.
\[\operatorname{cartesian2polar} ( x, y ) = \left( \sqrt{x^2 + y^2}, \operatorname{arctan2}_{\text{deg}} ( y, x ) \right)\]
- mate.utils.polar2cartesian(rho, phi)[source]
Convert polar coordinates to cartesian coordinates in degrees, element-wise.
\[\operatorname{polar2cartesian} ( \rho, \phi ) = \left( \rho \cos_{\text{deg}} ( \phi ), \rho \sin_{\text{deg}} ( \phi ) \right)\]
- class mate.utils.Vector2D(vector=None, norm=None, angle=None, origin=None)[source]
Bases:
object
2D Vector.
- property vector
- property x
- property y
- property endpoint
- property angle
- property norm
Module contents
MATE: The Multi-Agent Tracking Environment.
- mate.make(id: str | EnvSpec, max_episode_steps: int | None = None, autoreset: bool = False, apply_api_compatibility: bool | None = None, disable_env_checker: bool | None = None, **kwargs) Env [source]
Create an environment according to the given ID.
To find all available environments use gym.envs.registry.keys() for all valid ids.
- Parameters:
id – Name of the environment. Optionally, a module to import can be included, eg. ‘module:Env-v0’
max_episode_steps – Maximum length of an episode (TimeLimit wrapper).
autoreset – Whether to automatically reset the environment after each episode (AutoResetWrapper).
apply_api_compatibility – Whether to wrap the environment with the StepAPICompatibility wrapper that converts the environment step from a done bool to return termination and truncation bools. By default, the argument is None to which the environment specification apply_api_compatibility is used which defaults to False. Otherwise, the value of apply_api_compatibility is used. If True, the wrapper is applied otherwise, the wrapper is not applied.
disable_env_checker – If to run the env checker, None will default to the environment specification disable_env_checker (which is by default False, running the environment checker), otherwise will run according to this parameter (True = not run, False = run)
kwargs – Additional arguments to pass to the environment constructor.
- Returns:
An instance of the environment.
- Raises:
Error – If the
id
doesn’t exist then an error is raised
- mate.camera_observation_space_of(num_cameras: int, num_targets: int, num_obstacles: int) Box [source]
Get the space object of a single camera’s observation from the given number of entities.
- mate.target_observation_space_of(num_cameras: int, num_targets: int, num_obstacles: int) Box [source]
Get the space object of a single target’s observation from the given number of entities.
- mate.observation_space_of(team: Team, num_cameras: int, num_targets: int, num_obstacles: int) Box [source]
Get the space object of a single agent’s observation of the given team from the given number of entities.
- mate.camera_observation_indices_of(num_cameras: int, num_targets: int, num_obstacles: int) ndarray [source]
The start indices of each part of the camera observation.
- mate.target_observation_indices_of(num_cameras: int, num_targets: int, num_obstacles: int) ndarray [source]
The start indices of each part of the target observation.
- mate.observation_indices_of(team: Team, num_cameras: int, num_targets: int, num_obstacles: int) ndarray [source]
The start indices of each part of the observation.
- mate.camera_observation_slices_of(num_cameras: int, num_targets: int, num_obstacles: int) Dict[str, slice] [source]
The slices of each part of the camera observation.
- mate.target_observation_slices_of(num_cameras: int, num_targets: int, num_obstacles: int) Dict[str, slice] [source]
The slices of each part of the target observation.
- mate.observation_slices_of(team: Team, num_cameras: int, num_targets: int, num_obstacles: int) ndarray [source]
The slices of each part of the observation.
- mate.camera_coordinate_mask_of(num_cameras: int, num_targets: int, num_obstacles: int) ndarray [source]
Get the bit mask from the given number of entities. The bit values is true if the corresponding entry in a single target’s observation is a coordinate value (exclude the current target itself).
- mate.target_coordinate_mask_of(num_cameras: int, num_targets: int, num_obstacles: int) ndarray [source]
Get the bit mask from the given number of entities. The bit values is true if the corresponding entry in a single target’s observation is a coordinate value (exclude the current target itself).
- mate.coordinate_mask_of(team: Team, num_cameras: int, num_targets: int, num_obstacles: int) ndarray [source]
Get the bit mask from the given number of entities. The bit values is true if the corresponding entry in a single agent’s observation is a boolean value (include the current agent itself).
- mate.read_config(config_or_path: Dict[str, Any] | str | None = None, **kwargs) Dict[str, Any] [source]
Load configuration from a dictionary mapping or a JSON/YAML file.
- class mate.MultiAgentTracking(config: Dict[str, Any] | str | None = None, **kwargs)[source]
Bases:
Env
,EzPickle
The main class of the Multi-Agent Tracking Environment. It encapsulates an environment with arbitrary behind-the-scenes dynamics. This environment is partially observed for both teams.
The main API methods that users of this class need to know are:
step
reset
render
close
seed
send_messages <- new method
receive_messages <- new method
load_config <- new method
And set the following attributes:
action_space: A tuple of two Space objects corresponding to valid joint actions of cameras and targets camera_action_space: The Space object corresponding to a single camera’s valid actions camera_joint_action_space: The Space object corresponding to valid joint actions of all cameras target_action_space: The Space object corresponding to a single target’s valid actions target_joint_action_space: The Space object corresponding to valid joint actions of all targets observation_space: A tuple of two Space objects corresponding to valid joint observations of cameras and targets camera_observation_space: The Space object corresponding to a single camera’s valid observations camera_joint_observation_space: The Space object corresponding to valid joint observations of all cameras target_observation_space: The Space object corresponding to a single target’s valid observations target_joint_observation_space: The Space object corresponding to valid joint observations of all targets
The methods are accessed publicly as “step”, “reset”, etc…
- metadata: Dict[str, Any] = {'render.modes': ['human', 'rgb_array'], 'video.frames_per_second': 60, 'video.output_frames_per_second': 60}
- DEFAULT_CONFIG_FILE = PosixPath('/home/docs/checkouts/readthedocs.org/user_builds/mate-gym/checkouts/latest/mate/assets/MATE-4v8-9.yaml')
The default configuration file.
- __init__(config: Dict[str, Any] | str | None = None, **kwargs) None [source]
Initialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.
- action_space: Space[ActType]
- observation_space: Space[ObsType]
- load_config(config: Dict[str, Any] | str | None = None) None [source]
Reinitialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.
- Parameters:
config (Optional[Union[Dict[str, Any], str]]) – a dictionary mapping or a path to a readable JSON/YAML file
Examples
You can change the environment configuration without creating a new environment, and this will keep the wrappers you add.
>>> env = mate.make('MultiAgentTracking-v0', config='MATE-4v8-9.yaml') >>> env = mate.MultiCamera(env, target_agent=mate.GreedyTargetAgent(seed=0)) >>> print(env) <MultiCamera<MultiAgentTracking<MultiAgentTracking-v0>>(4 cameras, 8 targets, 9 obstacles)> >>> env.load_config('MATE-4v2-9.yaml') >>> print(env) <MultiCamera<MultiAgentTracking<MultiAgentTracking-v0>>(4 cameras, 2 targets, 9 obstacles)>
- step(action: Tuple[ndarray, ndarray]) Tuple[Tuple[ndarray, ndarray], Tuple[float, float], bool, Tuple[List[dict], List[dict]]] [source]
Run one timestep of the environment’s dynamics. When end of episode is reached, you are responsible for calling reset() to reset this environment’s state.
Accepts a tuple of cameras’ joint action and targets’ joint action, and returns a tuple (observation, reward, done, info).
- Parameters:
action (Tuple[np.ndarray, np.ndarray]) – a tuple of joint actions provided by the camera agents and the target agents
- Returns:
a tuple of agent’s observation of the current environment reward (Tuple[float, float]): a tuple of the amount of reward returned after previous action done (bool): whether the episode has ended, in which case further step() calls will return undefined results info (Tuple[List[dict], List[dict]]): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning)
- Return type:
observation (Tuple[np.ndarray, np.ndarray])
- reset(*, seed: int | None = None) Tuple[ndarray, ndarray] [source]
Resets the environment to an initial state and returns an initial observation. The entities (cameras, targets and obstacles) may be shuffled if not explicitly disabled in configuration.
Note that unless an explicit seed is provided, this function would not reset the environment’s random number generator(s). Random variables in the environment’s state should be sampled independently between multiple calls to reset(). In other words, each call of reset() should yield an environment suitable for a new episode, independent of previous episodes.
- Parameters:
seed (int) – the seed for the random number generator(s)
- Returns:
the initial observations of all cameras and targets.
- Return type:
observations (Tuple[numpy.ndarray, np.ndarray])
- send_messages(messages: Message | Iterable[Message]) None [source]
Buffer the messages from an agent to others in the same team.
The environment will send the messages to recipients’ through method receive_messages(), and also info field of step() results.
- receive_messages(agent_id: Tuple[Team, int] | None = None, agent: AgentType | None = None) Tuple[List[List[Message]], List[List[Message]]] | List[Message] [source]
Retrieve the messages to recipients. If no agent is specified, this method will return all the messages to all agents in the environment.
The environment will also put the messages to recipients’ info field of step() results.
- render(mode: str = 'human', window_size: int = 800, onetime_callbacks: Iterable[Callable[[MultiAgentTracking, str], None]] = ()) bool | ndarray [source]
Render the environment.
The set of supported modes varies per environment. (And some environments do not support rendering at all.) By convention, if mode is:
human: render to the current display or terminal and return nothing. Usually for human consumption.
rgb_array: Return an numpy.ndarray with shape (x, y, 3), representing RGB values for an x-by-y pixel image, suitable for turning into a video.
- add_render_callback(name: str, callback: Callable[[MultiAgentTracking, str], None]) None [source]
Add a callback function to the render function.
This is useful to add additional elements to the rendering results.
- close() None [source]
Perform necessary cleanup.
Environments will automatically close() themselves when garbage collected or when the program exits.
- seed(seed: int | None = None) List[int] [source]
Set the seed for this environment’s random number generators.
Note
Some environments use multiple pseudorandom number generators. We want to capture all such seeds used in order to ensure that there aren’t accidental correlations between multiple generators.
- Returns:
- the list of seeds used in this environment’s random
number generators. The first value in the list should be the “main” seed, or the value which a reproducer should pass to ‘seed’. Often, the main seed equals the provided ‘seed’, but this won’t be true if seed=None, for example.
- Return type:
list<bigint>
- property np_random: RandomState
The main random number generator of the environment.
- route_messages(messages: List[Message]) List[Message] [source]
Convert broadcast messages to peer-to-peer forms.
- property targets_start_with_cargoes: bool
Always assign cargoes to the target at the beginning of an episode.
- class mate.EnhancedObservation(env: BaseEnvironmentType, team: Literal['both', 'camera', 'target', 'none'] = 'both')[source]
Bases:
ObservationWrapper
Enhance the agent’s observation, which sets all observation mask to True. The targets can observe the empty status of all warehouses even when far away.
Bases:
ObservationWrapper
Share field of view among agents in the same team, which applies the “or” operator over the observation masks. The target agents also share the empty status of warehouses.
Reinitialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.
Returns a modified observation.
- class mate.RescaledObservation(env: MultiAgentTracking)[source]
Bases:
ObservationWrapper
Rescale all entity states in the observation to [-1., +1.]. (Not used in the evaluation script.)
- class mate.RelativeCoordinates(env: MultiAgentTracking)[source]
Bases:
ObservationWrapper
Convert all locations of other entities in the observation to relative coordinates (exclude the current agent itself). (Not used in the evaluation script.)
- class mate.MoreTrainingInformation(env: BaseEnvironmentType)[source]
Bases:
Wrapper
Add more environment and agent information to the info field of step(), enabling full observability of the environment. (Not used in the evaluation script.)
- step(action: Tuple[ndarray, ndarray]) Tuple[Tuple[ndarray, ndarray], Tuple[float, float], bool, Tuple[List[dict], List[dict]]] | Tuple[Tuple[ndarray, ndarray], Tuple[List[float], List[float]], Tuple[List[bool], List[bool]], Tuple[List[dict], List[dict]]] [source]
Steps through the environment with action.
- class mate.DiscreteCamera(env: BaseEnvironmentType, levels: int = 5)[source]
Bases:
ActionWrapper
Wrap the environment to allow cameras to use discrete actions.
- load_config(config: Dict[str, Any] | str | None = None) None [source]
Reinitialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.
- action(action: Tuple[ndarray, ndarray]) Tuple[ndarray, ndarray] [source]
Convert joint action of cameras from discrete to continuous.
- class mate.DiscreteTarget(env: BaseEnvironmentType, levels: int = 5)[source]
Bases:
ActionWrapper
Wrap the environment to allow targets to use discrete actions.
- load_config(config: Dict[str, Any] | str | None = None) None [source]
Reinitialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.
- action(action: Tuple[ndarray, ndarray]) Tuple[ndarray, ndarray] [source]
Convert joint action of targets from discrete to continuous.
- class mate.AuxiliaryCameraRewards(env: BaseEnvironmentType | MultiCamera | MultiTarget, coefficients: Dict[str, float | Callable[[int, int, int, float, float], float]], reduction: Literal['mean', 'sum', 'max', 'min', 'none'] = 'none')[source]
Bases:
Wrapper
Add additional auxiliary rewards for each individual camera. (Not used in the evaluation script.)
The auxiliary reward is a weighted sum of the following components:
raw_reward
(the higher the better): team reward returned by the environment (shared, range in \((-\infty, 0]\)).coverage_rate
(the higher the better): coverage rate of all targets in the environment (shared, range in \([0, 1]\)).real_coverage_rate
(the higher the better): coverage rate of targets with cargoes in the environment (shared, range in \([0, 1]\)).mean_transport_rate
(the lower the better): mean transport rate of the target team (shared, range in \([0, 1]\)).soft_coverage_score
(the higher the better): soft coverage score is proportional to the distance from the target to the camera’s boundary (individual, range in \([-1, N_{\mathcal{T}}]\)).num_tracked
(the higher the better): number of targets tracked the camera (shared, range in \([0, N_{\mathcal{T}}]\)).baseline
: constant \(1\).
- ACCEPTABLE_KEYS = ('raw_reward', 'coverage_rate', 'real_coverage_rate', 'mean_transport_rate', 'soft_coverage_score', 'num_tracked', 'baseline')
- REDUCERS = {'max': <function amax>, 'mean': <function mean>, 'min': <function amin>, 'sum': <function sum>}
- step(action: Tuple[ndarray, ndarray] | ndarray) Tuple[Tuple[ndarray, ndarray], Tuple[List[float], List[float]], Tuple[List[bool], List[bool]], Tuple[List[dict], List[dict]]] | Tuple[ndarray, List[float], List[bool], List[dict]] [source]
Steps through the environment with action.
- class mate.AuxiliaryTargetRewards(env: BaseEnvironmentType | MultiCamera | MultiTarget, coefficients: Dict[str, float | Callable[[int, int, int, float, float], float]], reduction: Literal['mean', 'sum', 'max', 'min', 'none'] = 'none')[source]
Bases:
Wrapper
Add additional auxiliary rewards for each individual target. (Not used in the evaluation script.)
The auxiliary reward is a weighted sum of the following components:
raw_reward
(the higher the better): team reward returned by the environment (shared, range in \([0, +\infty)\)).coverage_rate
(the lower the better): coverage rate of all targets in the environment (shared, range in \([0, 1]\)).real_coverage_rate
(the lower the better): coverage rate of targets with cargoes in the environment (shared, range in \([0, 1]\)).mean_transport_rate
(the higher the better): mean transport rate of the target team (shared, range in \([0, 1]\)).normalized_goal_distance
(the lower the better): the normalized value of the distance to destination, or the nearest non-empty warehouse when the target is not loaded (individual, range in \([0, \sqrt{2}]\)).sparse_delivery
(the higher the better): a boolean value that indicates whether the target reaches the destination (individual, range in \({0, 1}\)).soft_coverage_score
(the lower the better): soft coverage score is proportional to the distance from the target to the camera’s boundary (individual, range in \([-1, N_{\mathcal{C}}]\)).is_tracked
(the lower the better): a boolean value that indicates whether the target is tracked by any camera or not. (individual, range in \({0, 1}\)).is_colliding
(the lower the better): a boolean value that indicates whether the target is colliding with obstacles, cameras’ barriers of terrain boundary. (individual, range in \({0, 1}\)).baseline
: constant \(1\).
- ACCEPTABLE_KEYS = ('raw_reward', 'coverage_rate', 'real_coverage_rate', 'mean_transport_rate', 'normalized_goal_distance', 'sparse_delivery', 'soft_coverage_score', 'is_tracked', 'is_colliding', 'baseline')
- REDUCERS = {'max': <function amax>, 'mean': <function mean>, 'min': <function amin>, 'sum': <function sum>}
- mate.group_reset(agents: Iterable[AgentType], joint_observation: ndarray | Iterable[ndarray]) None [source]
Reset a group of agents.
- mate.group_step(env: BaseEnvironmentType, agents: Iterable[AgentType], joint_observation: ndarray | Iterable[ndarray], infos: List[dict] | None = None, deterministic: bool | None = None) List[int | ndarray] [source]
Helper function to do a environment step for a group of agents.
- mate.group_observe(agents: Iterable[AgentType], joint_observation: ndarray | Iterable[ndarray], infos: List[dict] | None = None) List[int | ndarray] [source]
Set the observation for a group of agents.
- mate.group_communicate(env: BaseEnvironmentType, agents: Iterable[AgentType]) None [source]
Send and receive messages from a group of agents to the environment.
- mate.group_act(agents: Iterable[AgentType], joint_observation: ndarray | Iterable[ndarray], infos: List[dict] | None = None, deterministic: bool | None = None) List[int | ndarray] [source]
Get the joint action of a group of agents.
- class mate.MultiCamera(env: BaseEnvironmentType, target_agent: TargetAgentBase)[source]
Bases:
SingleTeamMultiAgent
Wrap the environment into a single-team multi-agent environment that users can use the Gym API to train and/or evaluate their camera agents.
- class mate.SingleCamera(env: BaseEnvironmentType, other_camera_agent: CameraAgentBase, target_agent: TargetAgentBase)[source]
Bases:
SingleTeamSingleAgent
Wrap the environment to a single-team single-agent environment that users can use the Gym API to train and/or evaluate their camera agent.
- class mate.MultiTarget(env: BaseEnvironmentType, camera_agent: CameraAgentBase)[source]
Bases:
SingleTeamMultiAgent
Wraps the environment into a single-team multi-agent environment that users can use the Gym API to train and/or evaluate their target agents.
- class mate.SingleTarget(env: BaseEnvironmentType, other_target_agent: TargetAgentBase, camera_agent: CameraAgentBase)[source]
Bases:
SingleTeamSingleAgent
Wrap the environment to a single-team single-agent environment that users can use the Gym API to train and/or evaluate their target agent.
- class mate.MessageFilter(env: MultiAgentTracking, filter: Callable[[MultiAgentTracking, Message], bool])[source]
Bases:
Wrapper
Filter messages from agents of intra-team communications. (Not used in the evaluation script.)
Users can use this wrapper to implement a communication channel with limited bandwidth, limited communication range, or random dropout. This wrapper can be applied multiple times with different filter functions.
Note
The filter function can also modify the message content. Users can use this to add channel signal noises etc.
- class mate.RestrictedCommunicationRange(env: MultiAgentTracking, range_limit: float)[source]
Bases:
MessageFilter
Add a restricted communication range to channels. (Not used in the evaluation script.)
- class mate.RandomMessageDropout(env: MultiAgentTracking, dropout_rate: float)[source]
Bases:
MessageFilter
Randomly drop messages in communication channels. (Not used in the evaluation script.)
- class mate.NoCommunication(env: MultiAgentTracking, team: Literal['both', 'camera', 'target', 'none'] = 'both')[source]
Bases:
MessageFilter
Disable intra-team communications, i.e., filter out all messages.
- class mate.ExtraCommunicationDelays(env: MultiAgentTracking, delay: int | Callable[[MultiAgentTracking, Message], int] = 3)[source]
Bases:
Wrapper
Add extra message delays to communication channels. (Not used in the evaluation script.)
Users can use this wrapper to implement a communication channel with random delays.
- class mate.RenderCommunication(env: MultiAgentTracking, duration: int | None = 20)[source]
Bases:
Wrapper
Draw arrows for intra-team communications in rendering results.
- load_config(config: Dict[str, Any] | str | None = None) None [source]
Reinitialize the Multi-Agent Tracking Environment from a dictionary mapping or a JSON/YAML file.
- step(action: Tuple[ndarray, ndarray]) Tuple[Tuple[ndarray, ndarray], Tuple[float, float], bool, Tuple[List[dict], List[dict]]] [source]
Steps through the environment with action.
- callback(unwrapped: MultiAgentTracking, mode: str) None [source]
Draw communication messages as arrows.
- class mate.RepeatedRewardIndividualDone(env: BaseEnvironmentType | MultiCamera | MultiTarget, target_done_at_destination=False)[source]
Bases:
Wrapper
Repeat the reward field and assign individual done field of step(), which is similar to the OpenAI Multi-Agent Particle Environment. (Not used in the evaluation script.)
- class mate.WrapperSpec(wrapper, *args, **kwargs)[source]
Bases:
object
Helper class for creating environments with wrappers.
- class mate.CameraAgentBase(seed: int | None = None)[source]
Bases:
AgentBase
Base class for camera agents.
- STATE_CLASS
alias of
CameraStatePrivate
- TEAMMATE_STATE_CLASS
alias of
CameraStatePublic
- OPPONENT_STATE_CLASS
alias of
TargetStatePublic
- class mate.TargetAgentBase(seed: int | None = None)[source]
Bases:
AgentBase
Base class for target agents.
- STATE_CLASS
alias of
TargetStatePrivate
- TEAMMATE_STATE_CLASS
alias of
TargetStatePublic
- OPPONENT_STATE_CLASS
alias of
CameraStatePublic
- class mate.RandomCameraAgent(seed=None, frame_skip=20)[source]
Bases:
CameraAgentBase
Random Camera Agent
Random action.
- __init__(seed=None, frame_skip=20)[source]
Initialize the agent. This function will be called only once on initialization.
Note
Agents can obtain the number of teammates and opponents on reset, but not here. You are responsible for writing scalable policies and code to handle this.
- class mate.RandomTargetAgent(seed=None, frame_skip=20)[source]
Bases:
TargetAgentBase
Random Target Agent
Random action.
- __init__(seed=None, frame_skip=20)[source]
Initialize the agent. This function will be called only once on initialization.
Note
Agents can obtain the number of teammates and opponents on reset, but not here. You are responsible for writing scalable policies and code to handle this.
- class mate.NaiveCameraAgent(seed: int | None = None)[source]
Bases:
CameraAgentBase
Naive Camera Agent
Rotates anti-clockwise with the maximum viewing angle.
- class mate.NaiveTargetAgent(seed=None)[source]
Bases:
TargetAgentBase
Naive Target Agent
Visits all warehouses in turn.
- __init__(seed=None)[source]
Initialize the agent. This function will be called only once on initialization.
- property goal_location
Location of the current warehouse.
- class mate.GreedyCameraAgent(seed=None, memory_period=25, filterout_unloaded=False, filterout_beyond_range=True)[source]
Bases:
CameraAgentBase
Greedy Camera Agent
Arbitrarily tracks the nearest target. If no target found, use previous action or generate a new random action.
- __init__(seed=None, memory_period=25, filterout_unloaded=False, filterout_beyond_range=True)[source]
Initialize the agent. This function will be called only once on initialization.
- reset(observation)[source]
Reset the agent. This function will be called immediately after env.reset().
- observe(observation, info=None)[source]
The agent observe the environment before sending messages. This function will be called before send_responses().
- act(observation, info=None, deterministic=None)[source]
Get the agent action by the observation. This function will be called before every env.step().
Arbitrarily track the nearest target. If no target found, use previous action or generate a new random action.
- process_messages(observation, messages)[source]
Process observation and prepare messages to teammates.
- act_from_target_states(target_states)[source]
Place the selected target at the center of the field of view.
- class mate.GreedyTargetAgent(seed=None, noise_scale=0.5)[source]
Bases:
TargetAgentBase
Greedy Target Agent
Arbitrarily runs towards the destination (desired warehouse) with some noise.
- __init__(seed=None, noise_scale=0.5)[source]
Initialize the agent. This function will be called only once on initialization.
- property goal
Index of the current warehouse.
- property goal_location
Location of the current warehouse.
- reset(observation)[source]
Reset the agent. This function will be called immediately after env.reset().
- observe(observation, info=None)[source]
The agent observe the environment before sending messages. This function will be called before send_responses().
- act(observation, info=None, deterministic=None)[source]
Get the agent action by the observation. This function will be called before every env.step().
Arbitrarily run towards the warehouse with some noise.
- process_messages(observation, messages)[source]
Process observation and prepare messages to teammates.
- class mate.HeuristicCameraAgent(seed=None)[source]
Bases:
CameraAgentBase
Heuristic Camera Agent
Greedily maximizes the heuristic scores as much as possible. All agents send their observations to the centralized controller (agent 0). Then the central controller sends the goal state (camera pose) to each agent.
- __init__(seed=None)[source]
Initialize the agent. This function will be called only once on initialization.
- reset(observation)[source]
Reset the agent. This function will be called immediately after env.reset().
- act(observation, info=None, deterministic=None)[source]
Get the agent action by the observation. This function will be called before every env.step().
Arbitrarily track the nearest target. If no target found, use previous action or generate a new random action.
- send_requests()[source]
Prepare messages to communicate with other agents in the same team. This function will be called after observe() but before receive_requests().
- receive_requests(messages)[source]
Receive messages from other agents in the same team. This function will be called after send_requests().
- send_responses()[source]
Prepare messages to communicate with other agents in the same team. This function will be called after receive_requests().
- receive_responses(messages)[source]
Receive messages from other agents in the same team. This function will be called after send_responses() but before act().
- class mate.HeuristicTargetAgent(seed=None, noise_scale=0.5)[source]
Bases:
GreedyTargetAgent
Heuristic Target Agent
Arbitrarily runs towards the destination (desired warehouse) with some noise. In addition to the greedy target agent, a drift speed is added, which escapes away from the center of cameras’ field of view.
- class mate.MixtureCameraAgent(candidates, weights=None, mixture_seed=None, seed=None)[source]
Bases:
MixtureAgentMixIn
,CameraAgentBase
Mixture Camera Agent
Randomly choose a underlying camera agent in candidates at episode start.
- class mate.MixtureTargetAgent(candidates, weights=None, mixture_seed=None, seed=None)[source]
Bases:
MixtureAgentMixIn
,TargetAgentBase
Mixture Target Agent
Randomly choose a underlying target agent in candidates at episode start.
- mate.convert_coordinates(observation: ndarray, team: Team, num_cameras: int, num_targets: int, num_obstacles: int) ndarray [source]
Convert all locations of other entities in the observation to relative coordinates (exclude the current agent itself).
- mate.normalize_observation(observation: ndarray, observation_space: Box, additional_mask: ndarray | None = None) ndarray [source]
Rescale all entity states in the observation to [-1., +1.].
- mate.rescale_observation(observation: ndarray, team: Team, num_cameras: int, num_targets: int, num_obstacles: int) ndarray [source]
Rescale all entity states in the observation to [-1., +1.].
- mate.split_observation(observation: ndarray, team: Team, num_cameras: int, num_targets: int, num_obstacles: int) Tuple[ndarray, ndarray, ndarray, ndarray, ndarray] [source]
Splits the concatenated observations into parts.
- class mate.CameraStatePrivate(state: ndarray, index: int)[source]
Bases:
CameraStatePublic
- DIM = 9
- property action_space: Box
- class mate.TargetStatePrivate(state: ndarray, index: int)[source]
Bases:
StateBase
- DIM = 14
- property goal_bits: ndarray
- property empty_bits: ndarray
- property action_space: Box
- mate.sin_deg(x)[source]
Trigonometric sine in degrees, element-wise.
\[\sin_{\text{deg}} ( x ) = \sin \left( \frac{\pi}{180} x \right)\]
- mate.cos_deg(x)[source]
Trigonometric cosine in degrees, element-wise.
\[\cos_{\text{deg}} ( x ) = \cos \left( \frac{\pi}{180} x \right)\]
- mate.tan_deg(x)[source]
Trigonometric tangent in degrees, element-wise.
\[\tan_{\text{deg}} ( x ) = \tan \left( \frac{\pi}{180} x \right)\]
- mate.arcsin_deg(x)[source]
Trigonometric inverse sine in degrees, element-wise.
\[\arcsin_{\text{deg}} ( x ) = \frac{180}{\pi} \arcsin ( x )\]
- mate.arccos_deg(x)[source]
Trigonometric inverse cosine in degrees, element-wise.
\[\arccos_{\text{deg}} ( x ) = \frac{180}{\pi} \arcsin ( x )\]
- mate.arctan2_deg(y, x)[source]
Element-wise arc tangent of y/x in degrees.
\[\operatorname{arctan2}_{\text{deg}} ( y, x ) = \frac{180}{\pi} \arctan \left( \frac{y}{x} \right)\]
- mate.cartesian2polar(x, y)[source]
Convert cartesian coordinates to polar coordinates in degrees, element-wise.
\[\operatorname{cartesian2polar} ( x, y ) = \left( \sqrt{x^2 + y^2}, \operatorname{arctan2}_{\text{deg}} ( y, x ) \right)\]
- mate.polar2cartesian(rho, phi)[source]
Convert polar coordinates to cartesian coordinates in degrees, element-wise.
\[\operatorname{polar2cartesian} ( \rho, \phi ) = \left( \rho \cos_{\text{deg}} ( \phi ), \rho \sin_{\text{deg}} ( \phi ) \right)\]
- class mate.Vector2D(vector=None, norm=None, angle=None, origin=None)[source]
Bases:
object
2D Vector.
- property vector
- property x
- property y
- property endpoint
- property angle
- property norm