Source code for mate.wrappers.auxiliary_target_rewards

# pylint: disable=missing-module-docstring

from typing import Callable, Dict, List, Tuple, Union

import gym
import numpy as np

from mate import constants as consts
from mate.wrappers.auxiliary_camera_rewards import AuxiliaryCameraRewards
from mate.wrappers.repeated_reward_individual_done import RepeatedRewardIndividualDone
from mate.wrappers.single_team import MultiCamera, SingleTeamHelper
from mate.wrappers.typing import (
    MultiAgentEnvironmentType,
    WrapperMeta,
    assert_multi_agent_environment,
)


try:
    from typing import Literal
except ImportError:
    from typing_extensions import Literal


[docs]class AuxiliaryTargetRewards(gym.Wrapper, metaclass=WrapperMeta): r"""Add additional auxiliary rewards for each individual target. (Not used in the evaluation script.) The auxiliary reward is a weighted sum of the following components: - ``raw_reward`` (the higher the better): team reward returned by the environment (shared, range in :math:`[0, +\infty)`). - ``coverage_rate`` (the lower the better): coverage rate of all targets in the environment (shared, range in :math:`[0, 1]`). - ``real_coverage_rate`` (the lower the better): coverage rate of targets with cargoes in the environment (shared, range in :math:`[0, 1]`). - ``mean_transport_rate`` (the higher the better): mean transport rate of the target team (shared, range in :math:`[0, 1]`). - ``normalized_goal_distance`` (the lower the better): the normalized value of the distance to destination, or the nearest non-empty warehouse when the target is not loaded (individual, range in :math:`[0, \sqrt{2}]`). - ``sparse_delivery`` (the higher the better): a boolean value that indicates whether the target reaches the destination (individual, range in :math:`{0, 1}`). - ``soft_coverage_score`` (the lower the better): soft coverage score is proportional to the distance from the target to the camera's boundary (individual, range in :math:`[-1, N_{\mathcal{C}}]`). - ``is_tracked`` (the lower the better): a boolean value that indicates whether the target is tracked by any camera or not. (individual, range in :math:`{0, 1}`). - ``is_colliding`` (the lower the better): a boolean value that indicates whether the target is colliding with obstacles, cameras' barriers of terrain boundary. (individual, range in :math:`{0, 1}`). - ``baseline``: constant :math:`1`. """ # pylint: disable=line-too-long ACCEPTABLE_KEYS = ( 'raw_reward', # team reward 'coverage_rate', # team reward 'real_coverage_rate', # team reward 'mean_transport_rate', # team reward 'normalized_goal_distance', # individual reward 'sparse_delivery', # individual reward 'soft_coverage_score', # individual reward 'is_tracked', # individual reward 'is_colliding', # individual reward 'baseline', # constant 1 ) # fmt: skip REDUCERS = { 'mean': np.mean, 'sum': np.sum, 'max': np.max, 'min': np.min, } def __init__( self, env: MultiAgentEnvironmentType, coefficients: Dict[str, Union[float, Callable[[int, int, int, float, float], float]]], reduction: Literal['mean', 'sum', 'max', 'min', 'none'] = 'none', ) -> None: assert_multi_agent_environment(env) assert isinstance(env, RepeatedRewardIndividualDone), ( f'You should use wrapper `{self.__class__}` with wrapper `RepeatedRewardIndividualDone`. ' f'Please wrap the environment with wrapper `RepeatedRewardIndividualDone` first. ' f'Got env = {env}.' ) assert not isinstance(env, MultiCamera), ( f'You should not use wrapper `{self.__class__}` with wrapper `CameraTarget`. ' f'Got env = {env}.' ) assert not isinstance( env, AuxiliaryTargetRewards ), f'You should not use wrapper `{self.__class__}` more than once. Got env = {env}.' assert reduction in ('mean', 'sum', 'max', 'none'), ( f'Invalid reduction method {reduction}. ' f'The reduction method should be one of {("mean", "sum", "max", "min")} (for shared reward), ' f'or "none" for no reduction (for individual reward).' ) assert set(self.ACCEPTABLE_KEYS).issuperset(coefficients.keys()), ( f'The coefficient mapping only accepts keys in {self.ACCEPTABLE_KEYS}. ' f'Got list(coefficients.keys()) = {list(coefficients.keys())}.' ) # The coefficient should be a function with signature: # (agent_id: int, episode_id: int, episode_step: int, raw_reward: float, auxiliary_reward: float) -> float # or a constant float number. self.coefficients = {} for key, coefficient in coefficients.items(): assert callable(coefficient) or isinstance(coefficient, (float, int)), ( f'The argument `coefficient` should be a callable function or a float number. ' f'Got coefficients[{key!r}] = {coefficient!r}.' ) self.coefficients[key] = ( coefficient if not isinstance(coefficient, int) else float(coefficient) ) super().__init__(env) self.episode_id = -1 self.reduction = reduction self.single_team = isinstance(env, SingleTeamHelper) self.soft_coverage_score_matrix = None
[docs] def reset(self, **kwargs) -> np.ndarray: self.episode_id += 1 self.soft_coverage_score_matrix = None return self.env.reset(**kwargs)
# pylint: disable-next=too-many-locals,too-many-branches
[docs] def step( self, action: Union[Tuple[np.ndarray, np.ndarray], np.ndarray] ) -> Union[ Tuple[ Tuple[np.ndarray, np.ndarray], Tuple[List[float], List[float]], Tuple[List[bool], List[bool]], Tuple[List[dict], List[dict]], ], Tuple[np.ndarray, List[float], List[bool], List[dict]], ]: observations, rewards, dones, infos = self.env.step(action) if self.single_team: target_rewards, target_infos = list(rewards), infos else: target_rewards, target_infos = list(rewards[1]), infos[1] normalized_goal_distances = np.zeros((self.num_targets,), dtype=np.float64) sparse_delivery = self.target_dones.astype(np.float64) for t, target in enumerate(self.targets): goal = self.target_goals[t] warehouse_distances = np.maximum( self.target_warehouse_distances[t] - consts.WAREHOUSE_RADIUS, 0.0, dtype=np.float64 ) if goal >= 0: goal_distance = warehouse_distances[goal] elif not target.empty_bits.all(): goal_distance = warehouse_distances[np.logical_not(target.empty_bits)].min() else: goal_distance = consts.TERRAIN_WIDTH / 2.0 normalized_goal_distances[t] = goal_distance / consts.TERRAIN_WIDTH soft_coverage_scores = np.zeros((self.num_targets,), dtype=np.float64) if 'soft_coverage_score' in self.coefficients: self.soft_coverage_score_matrix = AuxiliaryCameraRewards.compute_soft_coverage_scores( self.unwrapped ) camera_target_view_mask = self.camera_target_view_mask for t, target in enumerate(self.targets): if camera_target_view_mask[:, t].any(): scores = self.soft_coverage_score_matrix[camera_target_view_mask[:, t], t] soft_coverage_scores[t] = scores.sum() else: scores = self.soft_coverage_score_matrix[:, t] soft_coverage_scores[t] = np.tanh(scores.max()) for t, (raw_reward, info) in enumerate(zip(tuple(target_rewards), target_infos)): auxiliary_rewards = { 'raw_reward': raw_reward, 'coverage_rate': self.coverage_rate, 'real_coverage_rate': self.real_coverage_rate, 'mean_transport_rate': self.mean_transport_rate, 'normalized_goal_distance': normalized_goal_distances[t], 'sparse_delivery': sparse_delivery[t], 'soft_coverage_score': soft_coverage_scores[t], 'is_tracked': self.camera_target_view_mask[..., t].any(), 'is_colliding': self.targets[t].is_colliding, 'baseline': 1.0, } reward = 0.0 for key, coefficient in self.coefficients.items(): if callable(coefficient): coefficient = coefficient( t, self.episode_id, self.episode_step, raw_reward, auxiliary_rewards[key] ) reward += coefficient * auxiliary_rewards[key] info.setdefault(key, auxiliary_rewards[key]) info[f'auxiliary_reward_{key}'] = auxiliary_rewards[key] info[f'reward_coefficient_{key}'] = coefficient info['reward'] = target_rewards[t] = reward reducer = self.REDUCERS.get(self.reduction, None) if reducer is not None: reducer = self.REDUCERS[self.reduction] shared_reward = reducer(target_rewards) target_rewards = [shared_reward] * self.num_targets for info in target_infos: info['shared_reward'] = shared_reward if not self.single_team: rewards = (rewards[0], target_rewards) else: rewards = target_rewards return observations, rewards, dones, infos