Source code for mate.constants

"""Constants of the Multi-Agent Tracking Environment."""

import functools
from typing import Dict

import numpy as np
from gym import spaces

from mate.utils import Team


__all__ = [
    'TERRAIN_SIZE',
    'TERRAIN_WIDTH',
    'TERRAIN_SPACE',
    'WAREHOUSES',
    'NUM_WAREHOUSES',
    'WAREHOUSE_RADIUS',
    'MAX_CAMERA_VIEWING_ANGLE',
    'TARGET_RADIUS',
    'PRESERVED_SPACE',
    'PRESERVED_DIM',
    'OBSERVATION_OFFSET',
    'CAMERA_STATE_DIM_PUBLIC',
    'CAMERA_STATE_SPACE_PUBLIC',
    'CAMERA_STATE_DIM_PRIVATE',
    'CAMERA_STATE_SPACE_PRIVATE',
    'TARGET_STATE_DIM_PUBLIC',
    'TARGET_STATE_SPACE_PUBLIC',
    'TARGET_STATE_DIM_PRIVATE',
    'TARGET_STATE_SPACE_PRIVATE',
    'OBSTACLE_STATE_DIM',
    'OBSTACLE_STATE_SPACE',
    'CAMERA_ACTION_DIM',
    'CAMERA_DEFAULT_ACTION',
    'TARGET_ACTION_DIM',
    'TARGET_DEFAULT_ACTION',
    'camera_observation_space_of',
    'target_observation_space_of',
    'observation_space_of',
    'camera_observation_indices_of',
    'target_observation_indices_of',
    'observation_indices_of',
    'camera_observation_slices_of',
    'target_observation_slices_of',
    'observation_slices_of',
    'camera_coordinate_mask_of',
    'target_coordinate_mask_of',
    'coordinate_mask_of',
]

TERRAIN_SIZE = 1000.0
"""Terrain size. The terrain is a 2000.0 by 2000.0 square."""

TERRAIN_WIDTH = 2.0 * TERRAIN_SIZE
"""Terrain width. The terrain is a 2000.0 by 2000.0 square."""

TERRAIN_SPACE = spaces.Box(
    low=np.array([-TERRAIN_SIZE, -TERRAIN_SIZE]),
    high=np.array([+TERRAIN_SIZE, +TERRAIN_SIZE]),
    dtype=np.float64,
)
"""The space object of the terrain. The terrain is a 2000.0 by 2000.0 square.
(i.e., a square within range of :math:`[-1000, +1000] \\times [-1000, +1000]` in cartesian coordinates.)
"""

WAREHOUSE_RADIUS = 0.075 * TERRAIN_SIZE
"""Half width of the squared warehouses."""

WAREHOUSES = (TERRAIN_SIZE - WAREHOUSE_RADIUS) * np.array(
    [[+1.0, +1.0], [-1.0, +1.0], [-1.0, -1.0], [+1.0, -1.0]]
)
"""Center locations of the warehouses."""

NUM_WAREHOUSES = len(WAREHOUSES)
"""Number of warehouses."""

MAX_CAMERA_VIEWING_ANGLE = 180.0
"""Maximum viewing angle of cameras **in degrees**."""

TARGET_RADIUS = 0.0
"""Radius of targets."""

PRESERVED_SPACE = spaces.Box(
    low=np.concatenate(
        [[0] * 4, 2.0 * np.tile(TERRAIN_SPACE.low, reps=NUM_WAREHOUSES), [0.0]]
    ).astype(np.float64),
    high=np.concatenate(
        [[+np.inf] * 4, 2.0 * np.tile(TERRAIN_SPACE.high, reps=NUM_WAREHOUSES), [TERRAIN_SIZE]]
    ).astype(np.float64),
    dtype=np.float64,
)
"""The space object of agent's preserved data."""

PRESERVED_DIM = 3 + 1 + 2 * NUM_WAREHOUSES + 1
"""Preserved observation dimension,
which holds the number of entities in the environment and the index of current agent.
"""

OBSERVATION_OFFSET = PRESERVED_DIM
"""Preserved observation dimension,
which holds the number of entities in the environment and the index of current agent.
"""

CAMERA_STATE_DIM_PUBLIC = 6
"""Dimension of camera's public state."""

CAMERA_STATE_SPACE_PUBLIC = spaces.Box(
    low=np.append(2.0 * TERRAIN_SPACE.low, [0.0, -TERRAIN_WIDTH, -TERRAIN_WIDTH, 0.0]).astype(
        np.float64
    ),
    high=np.append(
        2.0 * TERRAIN_SPACE.high,
        [TERRAIN_SIZE, TERRAIN_WIDTH, TERRAIN_WIDTH, MAX_CAMERA_VIEWING_ANGLE],
    ).astype(np.float64),
    dtype=np.float64,
)
"""The space object of camera's public state."""

CAMERA_STATE_DIM_PRIVATE = 9
"""Dimension of camera's private state."""

CAMERA_STATE_SPACE_PRIVATE = spaces.Box(
    low=np.append(CAMERA_STATE_SPACE_PUBLIC.low, [0.0, 0.0, 0.0]).astype(np.float64),
    high=np.append(
        CAMERA_STATE_SPACE_PUBLIC.high,
        [TERRAIN_WIDTH, MAX_CAMERA_VIEWING_ANGLE, MAX_CAMERA_VIEWING_ANGLE],
    ).astype(np.float64),
    dtype=np.float64,
)
"""The space object of camera's private state."""

TARGET_STATE_DIM_PUBLIC = 4
"""Dimension of target's public state."""

# Use space Box(low=-1, high=1) as `is_loaded` bit space (the actual range is [0, 1]).
# The boolean bit will remain unchanged after observation normalization.
TARGET_STATE_SPACE_PUBLIC = spaces.Box(
    low=np.append(2.0 * TERRAIN_SPACE.low, [0.0, -1.0]).astype(np.float64),
    high=np.append(2.0 * TERRAIN_SPACE.high, [TERRAIN_WIDTH, 1.0]).astype(np.float64),
    dtype=np.float64,
)
"""The space object of target's public state."""

TARGET_STATE_DIM_PRIVATE = 6 + NUM_WAREHOUSES * 2
"""Dimension of target's private state."""

# Use space Box(low=-1, high=1) as `empty_bits` space (the actual range is [0, 1]).
# The boolean bits will remain unchanged after observation normalization.
TARGET_STATE_SPACE_PRIVATE = spaces.Box(
    low=np.concatenate(
        [
            TARGET_STATE_SPACE_PUBLIC.low,
            [0.0, 1.0],
            [0.0] * NUM_WAREHOUSES,
            [-1.0] * NUM_WAREHOUSES,
        ]
    ).astype(np.float64),
    high=np.concatenate(
        [
            TARGET_STATE_SPACE_PUBLIC.high,
            [TERRAIN_WIDTH, 2.0],
            [+np.inf] * NUM_WAREHOUSES,
            [1.0] * NUM_WAREHOUSES,
        ]
    ).astype(np.float64),
    dtype=np.float64,
)
"""The space object of target's private state."""

OBSTACLE_STATE_DIM = 3
"""Dimension of obstacle's state."""

OBSTACLE_STATE_SPACE = spaces.Box(
    low=np.append(2.0 * TERRAIN_SPACE.low, 0.0).astype(np.float64),
    high=np.append(2.0 * TERRAIN_SPACE.high, TERRAIN_SIZE).astype(np.float64),
    dtype=np.float64,
)
"""The space object of obstacle's state."""

CAMERA_ACTION_DIM = 2
"""Dimension of camera's action."""

CAMERA_DEFAULT_ACTION = np.asarray([0.0, 0.0], dtype=np.float64)
"""Default action of cameras."""

TARGET_ACTION_DIM = 2
"""Dimension of target's action."""

TARGET_DEFAULT_ACTION = np.asarray([0.0, 0.0], dtype=np.float64)
"""Default action of targets."""


[docs]@functools.lru_cache(maxsize=None) def camera_observation_space_of( num_cameras: int, num_targets: int, num_obstacles: int ) -> spaces.Box: """Get the space object of a single camera's observation from the given number of entities.""" # Use space Box(low=-1, high=1) as flag space (the actual range is [0, 1]). # The boolean flags will remain unchanged after observation normalization. return spaces.Box( low=np.concatenate( [ PRESERVED_SPACE.low, CAMERA_STATE_SPACE_PRIVATE.low, np.tile(np.append(TARGET_STATE_SPACE_PUBLIC.low, -1), reps=num_targets), np.tile(np.append(OBSTACLE_STATE_SPACE.low, -1), reps=num_obstacles), np.tile(np.append(CAMERA_STATE_SPACE_PUBLIC.low, -1), reps=num_cameras), ] ).astype(np.float64), high=np.concatenate( [ PRESERVED_SPACE.high, CAMERA_STATE_SPACE_PRIVATE.high, np.tile(np.append(TARGET_STATE_SPACE_PUBLIC.high, 1), reps=num_targets), np.tile(np.append(OBSTACLE_STATE_SPACE.high, 1), reps=num_obstacles), np.tile(np.append(CAMERA_STATE_SPACE_PUBLIC.high, 1), reps=num_cameras), ] ).astype(np.float64), dtype=np.float64, )
[docs]@functools.lru_cache(maxsize=None) def target_observation_space_of( num_cameras: int, num_targets: int, num_obstacles: int ) -> spaces.Box: """Get the space object of a single target's observation from the given number of entities.""" # Use space Box(low=-1, high=1) as flag space (the actual range is [0, 1]). # The boolean flags will remain unchanged after observation normalization. return spaces.Box( low=np.concatenate( [ PRESERVED_SPACE.low, TARGET_STATE_SPACE_PRIVATE.low, np.tile(np.append(CAMERA_STATE_SPACE_PUBLIC.low, -1), reps=num_cameras), np.tile(np.append(OBSTACLE_STATE_SPACE.low, -1), reps=num_obstacles), np.tile(np.append(TARGET_STATE_SPACE_PUBLIC.low, -1), reps=num_targets), ] ).astype(np.float64), high=np.concatenate( [ PRESERVED_SPACE.high, TARGET_STATE_SPACE_PRIVATE.high, np.tile(np.append(CAMERA_STATE_SPACE_PUBLIC.high, 1), reps=num_cameras), np.tile(np.append(OBSTACLE_STATE_SPACE.high, 1), reps=num_obstacles), np.tile(np.append(TARGET_STATE_SPACE_PUBLIC.high, 1), reps=num_targets), ] ).astype(np.float64), dtype=np.float64, )
[docs]@functools.lru_cache(maxsize=None) def observation_space_of( team: Team, num_cameras: int, num_targets: int, num_obstacles: int ) -> spaces.Box: """Get the space object of a single agent's observation of the given team from the given number of entities.""" return (camera_observation_space_of, target_observation_space_of)[team.value]( num_cameras, num_targets, num_obstacles )
[docs]@functools.lru_cache(maxsize=None) def camera_observation_indices_of( num_cameras: int, num_targets: int, num_obstacles: int ) -> np.ndarray: """The start indices of each part of the camera observation.""" return np.cumsum( [ 0, PRESERVED_DIM, CAMERA_STATE_DIM_PRIVATE, num_targets * (TARGET_STATE_DIM_PUBLIC + 1), num_obstacles * (OBSTACLE_STATE_DIM + 1), num_cameras * (CAMERA_STATE_DIM_PUBLIC + 1), ] )
[docs]@functools.lru_cache(maxsize=None) def target_observation_indices_of( num_cameras: int, num_targets: int, num_obstacles: int ) -> np.ndarray: """The start indices of each part of the target observation.""" return np.cumsum( [ 0, PRESERVED_DIM, TARGET_STATE_DIM_PRIVATE, num_cameras * (CAMERA_STATE_DIM_PUBLIC + 1), num_obstacles * (OBSTACLE_STATE_DIM + 1), num_targets * (TARGET_STATE_DIM_PUBLIC + 1), ] )
[docs]@functools.lru_cache(maxsize=None) def observation_indices_of( team: Team, num_cameras: int, num_targets: int, num_obstacles: int ) -> np.ndarray: """The start indices of each part of the observation.""" return (camera_observation_indices_of, target_observation_indices_of)[team.value]( num_cameras, num_targets, num_obstacles )
[docs]@functools.lru_cache(maxsize=None) def camera_observation_slices_of( num_cameras: int, num_targets: int, num_obstacles: int ) -> Dict[str, slice]: """The slices of each part of the camera observation.""" indices = camera_observation_indices_of(num_cameras, num_targets, num_obstacles) return { 'preserved_data': slice(indices[0], indices[1]), 'self_state': slice(indices[1], indices[2]), 'opponent_states_with_mask': slice(indices[2], indices[3]), 'opponent_mask': slice( indices[2] + TARGET_STATE_DIM_PUBLIC, indices[3], TARGET_STATE_DIM_PUBLIC + 1 ), 'obstacle_states_with_mask': slice(indices[3], indices[4]), 'obstacle_mask': slice(indices[3] + OBSTACLE_STATE_DIM, indices[4], OBSTACLE_STATE_DIM + 1), 'teammate_states_with_mask': slice(indices[4], indices[5]), 'teammate_mask': slice( indices[4] + CAMERA_STATE_DIM_PUBLIC, indices[5], CAMERA_STATE_DIM_PUBLIC + 1 ), }
[docs]@functools.lru_cache(maxsize=None) def target_observation_slices_of( num_cameras: int, num_targets: int, num_obstacles: int ) -> Dict[str, slice]: """The slices of each part of the target observation.""" indices = target_observation_indices_of(num_cameras, num_targets, num_obstacles) return { 'preserved_data': slice(indices[0], indices[1]), 'self_state': slice(indices[1], indices[2]), 'opponent_states_with_mask': slice(indices[2], indices[3]), 'opponent_mask': slice( indices[2] + CAMERA_STATE_DIM_PUBLIC, indices[3], CAMERA_STATE_DIM_PUBLIC + 1 ), 'obstacle_states_with_mask': slice(indices[3], indices[4]), 'obstacle_mask': slice(indices[3] + OBSTACLE_STATE_DIM, indices[4], OBSTACLE_STATE_DIM + 1), 'teammate_states_with_mask': slice(indices[4], indices[5]), 'teammate_mask': slice( indices[4] + TARGET_STATE_DIM_PUBLIC, indices[5], TARGET_STATE_DIM_PUBLIC + 1 ), }
[docs]@functools.lru_cache(maxsize=None) def observation_slices_of( team: Team, num_cameras: int, num_targets: int, num_obstacles: int ) -> np.ndarray: """The slices of each part of the observation.""" return (camera_observation_slices_of, target_observation_slices_of)[team.value]( num_cameras, num_targets, num_obstacles )
[docs]@functools.lru_cache(maxsize=None) def camera_coordinate_mask_of(num_cameras: int, num_targets: int, num_obstacles: int) -> np.ndarray: """Get the bit mask from the given number of entities. The bit values is true if the corresponding entry in a single target's observation is a coordinate value (exclude the current target itself). """ preserved_mask = np.zeros(PRESERVED_DIM, dtype=np.bool8) preserved_mask[-1 - 2 * NUM_WAREHOUSES : -1] = True # the warehouse locations camera_mask = np.zeros(CAMERA_STATE_DIM_PRIVATE, dtype=np.bool8) target_mask = np.zeros(TARGET_STATE_DIM_PUBLIC + 1, dtype=np.bool8) target_mask[:2] = True # first two elements (x, y) target_mask = np.tile(target_mask, reps=num_targets) obstacle_mask = np.zeros(OBSTACLE_STATE_DIM + 1, dtype=np.bool8) obstacle_mask[:2] = True # first two elements (x, y) obstacle_mask = np.tile(obstacle_mask, reps=num_obstacles) other_camera_mask = np.zeros(CAMERA_STATE_DIM_PUBLIC + 1, dtype=np.bool8) other_camera_mask[:2] = True # first two elements (x, y) other_camera_mask = np.tile(other_camera_mask, reps=num_cameras) return np.concatenate( [preserved_mask, camera_mask, target_mask, obstacle_mask, other_camera_mask] ).astype(np.bool8)
[docs]@functools.lru_cache(maxsize=None) def target_coordinate_mask_of(num_cameras: int, num_targets: int, num_obstacles: int) -> np.ndarray: """Get the bit mask from the given number of entities. The bit values is true if the corresponding entry in a single target's observation is a coordinate value (exclude the current target itself). """ preserved_mask = np.zeros(PRESERVED_DIM, dtype=np.bool8) preserved_mask[-1 - 2 * NUM_WAREHOUSES : -1] = True # the warehouse locations target_mask = np.zeros(TARGET_STATE_DIM_PRIVATE, dtype=np.bool8) camera_mask = np.zeros(CAMERA_STATE_DIM_PUBLIC + 1, dtype=np.bool8) camera_mask[:2] = True # first two elements (x, y) camera_mask = np.tile(camera_mask, reps=num_cameras) obstacle_mask = np.zeros(OBSTACLE_STATE_DIM + 1, dtype=np.bool8) obstacle_mask[:2] = True # first two elements (x, y) obstacle_mask = np.tile(obstacle_mask, reps=num_obstacles) other_target_mask = np.zeros(TARGET_STATE_DIM_PUBLIC + 1, dtype=np.bool8) other_target_mask[:2] = True # first two elements (x, y) other_target_mask = np.tile(other_target_mask, reps=num_targets) return np.concatenate( [preserved_mask, target_mask, camera_mask, obstacle_mask, other_target_mask] ).astype(np.bool8)
[docs]@functools.lru_cache(maxsize=None) def coordinate_mask_of( team: Team, num_cameras: int, num_targets: int, num_obstacles: int ) -> np.ndarray: """Get the bit mask from the given number of entities. The bit values is true if the corresponding entry in a single agent's observation is a boolean value (include the current agent itself). """ return (camera_coordinate_mask_of, target_coordinate_mask_of)[team.value]( num_cameras, num_targets, num_obstacles )