Source code for irlc.utils.player_wrapper

# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from gymnasium import logger
from irlc.ex01.agent import Agent
import time
import sys
import gymnasium as gym
import os
import asyncio

HUMAN_REQUEST_RESET = 'reset the environment'

try:
    # Using this backend apparently clash with scientific mode. Not sure why it was there in the first place so
    # disabling it for now.
    # matplotlib.use('TkAgg')
    import matplotlib.pyplot as plt
except ImportError as e:
    logger.warn('failed to set matplotlib backend, plotting will not work: %s' % str(e))
    plt = None

try:
    import pygame
    from pygame.event import Event
except ImportError as e:
    logger.warn('failed to import pygame. Many of the interactive visualizations will not work: %s' % str(e))
    Event = None

class AgentWrapper(Agent):
    """Wraps the environment to allow a modular transformation.

    This class is the base class for all wrappers. The subclass could override
    some methods to change the behavior of the original environment without touching the
    original code.

    .. note::

        Don't forget to call ``super().__init__(env)`` if the subclass overrides :meth:`__init__`.

    """
    def __init__(self, agent, env):
        self.agent = agent
        self.env = env

    def __getattr__(self, name):
        if name.startswith('_'):
            raise AttributeError("attempted to get missing private attribute '{}'".format(name))
        return getattr(self.agent, name)

    @classmethod
    def class_name(cls):
        return cls.__name__

    def pi(self, state, k, info=None):
        return self.agent.pi(state, k, info)
        # return self.env.step(action)

    def train(self, *args, **kwargs):
        return self.agent.train(*args, **kwargs)

    def __str__(self):
        return '<{}{}>'.format(type(self).__name__, self.agent)

    def __repr__(self):
        return str(self)

    @property
    def unwrapped(self):
        return self.agent.unwrapped

PAUSE_KEY = ord('p')
SPACEBAR = "_SPACE_BAR_PRESSED_"
HUMAN_DEMAND_RESET = "reset env."

async def _webassembly_interactive(env, agent, autoplay=False):
    from pygame import gfxdraw
    import numpy as np

    def aapolygon(surface, points, color):
        pygame.draw.polygon(surface, color, points)

    def filled_polygon(surface, points, color):
        pygame.draw.polygon(surface, color, points, width=0)

    def aacircle(surface, x, y, r, color):
        pygame.draw.circle(surface, color, (x, y), r, width=1)

    def filled_circle(surface, x, y, r, color):
        pygame.draw.circle(surface, color, (x, y), r, width=0)

        pass

    gfxdraw.aapolygon = aapolygon
    gfxdraw.filled_polygon = filled_polygon
    gfxdraw.aacircle = aacircle
    gfxdraw.filled_circle = filled_circle


    # from irlc.utils.player_wrapper import AsyncPlayerWrapperPygame
    agent = AsyncPlayerWrapperPygame(agent, env, autoplay=autoplay)
    s, info = env.reset()
    k = 0
    await asyncio.sleep(0.2)  # set up graphics.
    running = True
    step = 0
    while running:
        pi_action = agent.pi(s, k, info)
        a = None
        while True:
            for event in pygame.event.get():
                if event.type == pygame.KEYDOWN and event.key == pygame.K_r:
                    print("Resetting the environment")
                    a = HUMAN_DEMAND_RESET
                    break

                a = agent._resolve_event_into_action(event=event, pi_action=pi_action, info=info)
                if a is not None:
                    break
            if agent.human_demand_autoplay:
                a = pi_action
                break
            if a is not None:
                break
            await asyncio.sleep(0.05)

        if not isinstance(a, np.ndarray) and isinstance(a, str) and a == HUMAN_DEMAND_RESET:
            s, info = env.reset()
            continue

        sp, reward, done, _, info_sp = (await env.async_step(a)) if hasattr(env, 'async_step') else env.step(a)

        agent.train(s, a, reward, sp, done=done, info_s=info, info_sp=info_sp)

        step = step + 1
        k = k + 1
        if done:
            sp, info_sp = env.reset()
            k = 0

        s = sp
        info = info_sp
        await asyncio.sleep(0.01)  # release loop.






class PlayWrapperPygame(AgentWrapper):
    render_after_train = True

    def __init__(self, agent : Agent, env : gym.Env, keys_to_action=None, autoplay=False):
        super().__init__(agent, env)
        if keys_to_action is None:
            if hasattr(env, 'get_keys_to_action'):
                keys_to_action = env.get_keys_to_action()
            elif hasattr(env, "env") and hasattr(env.env, 'get_keys_to_action'):
                keys_to_action = env.env.get_keys_to_action()
            elif hasattr(env.unwrapped, 'get_keys_to_action'):
                keys_to_action = env.unwrapped.get_keys_to_action()
            else:
                keys_to_action = dict()
                # print(env.spec.id +" does not have explicit key to action mapping, please specify one manually")
                # assert False, env.spec.id + " does not have explicit key to action mapping, " + \
                #               "please specify one manually"
                # keys_to_action = dict()
        self.keys_to_action = keys_to_action
        self.env = env
        self.human_wants_restart = False
        self.human_sets_pause = False
        self.human_agent_action = -1
        self.human_demand_autoplay = autoplay
        # self.unwrapped_agent = agent
        # Now fix the train function
        train2 = agent.train

        def train_(s, a, r, sp, done, info_s, info_sp):
            train2(s, a, r, sp, done, info_s, info_sp)
            if self.render_after_train: # always true.
                env.render()

        agent.train = train_
        env.agent = agent

    def _resolve_event_into_action(self, event : pygame.event.Event, pi_action, info : dict):
        if event.type == pygame.QUIT:
            if hasattr(self, 'env'):
                self.env.close()
            time.sleep(0.1)
            pygame.display.quit()
            time.sleep(0.1)
            pygame.quit()
            time.sleep(0.1)
            sys.exit()

        # checking if keydown event happened or not
        if event.type == pygame.KEYDOWN:
            if event.key == pygame.K_SPACE:
                a = pi_action
                return a
            elif (event.key,) in self.keys_to_action:
                a = self.keys_to_action[(event.key,)]
                if info is not None and 'mask' in info:
                    from irlc.utils.common import DiscreteTextActionSpace
                    if isinstance(self.env.action_space, DiscreteTextActionSpace):
                        aint = self.env.action_space.actions.index(a)
                    else:
                        aint = a

                    if info['mask'][aint] == 0:
                        # The action was masked. This means that this action is unavailable, and we should select another.
                        # The default is to select one of the available actions from the mask.
                        from irlc.pacman.gamestate import GameState
                        from irlc.pacman.pacman_environment import PacmanEnvironment
                        if isinstance(self.env, PacmanEnvironment):
                            a = "Stop"
                        else:
                            a = info['mask'].argmax()
                            if isinstance(self.env.action_space, DiscreteTextActionSpace):
                                a = self.env.action_space.actions[a]
                    return a
                else:
                    return a

            elif event.unicode == 'p':
                # unpause
                self.human_demand_autoplay = not self.human_demand_autoplay

                if self.human_demand_autoplay:
                    a = pi_action
                    return a
            else:
                # try to pass event on to the game.
                if hasattr(self.env, 'keypress'):
                    self.env.keypress(event)
        return None


    # def _post_process_action_for_visualization(self, pi_action, k, info=None):
    #
    #     pass

    def pi(self,state, k, info=None):
        pi_action = super().pi(state, k, info) # make sure super class pi method is called in case it has side effects.
        a = None
        while True:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    if hasattr(self, 'env'):
                        self.env.close()
                    time.sleep(0.1)
                    pygame.display.quit()
                    time.sleep(0.1)
                    pygame.quit()
                    time.sleep(0.1)
                    sys.exit()

                # checking if keydown event happened or not
                if event.type == pygame.KEYDOWN:
                    if event.key == pygame.K_SPACE:
                        a = pi_action
                        break
                    elif (event.key,) in self.keys_to_action:
                        a = self.keys_to_action[(event.key,)]
                        if info is not None and 'mask' in info:
                            from irlc.utils.common import DiscreteTextActionSpace
                            if isinstance(self.env.action_space, DiscreteTextActionSpace):
                                aint = self.env.action_space.actions.index(a)
                            else:
                                aint = a

                            if info['mask'][aint] == 0:
                                # The action was masked. This means that this action is unavailable, and we should select another.
                                # The default is to select one of the available actions from the mask.
                                a = info['mask'].argmax()
                                if isinstance(self.env.action_space, DiscreteTextActionSpace):
                                    a = self.env.action_space.actions[a]
                            break
                        else:
                            break
                    elif event.unicode == 'p':
                        # unpause
                        self.human_demand_autoplay = not self.human_demand_autoplay
                        break
                    else:
                        # try to pass event on to the game.
                        if hasattr(self.env, 'keypress'):
                            self.env.keypress(event)

            if self.human_demand_autoplay:
                a = pi_action

            if a is not None:
                try:
                    from irlc.pacman.gamestate import GameState
                    if isinstance(state, GameState):
                        if a not in state.A():
                            a = "Stop"
                except Exception as e:
                    pass

                return a
            time.sleep(0.1)




class AsyncPlayerWrapperPygame(PlayWrapperPygame):
    # render_after_train = False

    def pi(self, state, k, info=None):
        pi_action = self.agent.pi(state, k, info)  # make sure super class pi method is called in case it has side effects.
        return pi_action

    # def _pygame_event_to_action(self, event : Event, pi_action):
    #     pass

    def _post_process_action_for_visualization(self):

        pass


[docs] def interactive(env : gym.Env, agent: Agent, autoplay=False) -> (gym.Env, Agent): """ This function is used for visualizations. It can - Allow you to input keyboard commands to an environment - Allow you to save results - Visualize reinforcement-learning agents in the gridworld environment. by adding a single extra line ``env, agent = interactive(env,agent)``. The following shows an example: >>> from irlc.gridworld.gridworld_environments import BookGridEnvironment >>> from irlc import train, Agent, interactive >>> env = BookGridEnvironment(render_mode="human", zoom=0.8) # Pass render_mode='human' for visualization. >>> env, agent = interactive(env, Agent(env)) # Make the environment interactive. Note that it needs an agent. >>> train(env, agent, num_episodes=2) # You can train and use the agent and environment as usual. >>> env.close() It also enables you to visualize the environment at a matplotlib figure or save it as a pdf file using ``env.plot()`` and ``env.savepdf('my_file.pdf)``. All demos and figures in the notes are made using this function. :param env: A gym environment (an instance of the ``Env`` class) :param agent: An agent (an instance of the ``Agent`` class) :param autoplay: Whether the simulation should be unpaused automatically :return: An environment and agent which have been slightly updated to make them interact with each other. You can use them as usual with the ``train``-function. """ from PIL import Image # Let's put this one here in case we run the code in headless mode. agent = PlayWrapperPygame(agent, env, autoplay=autoplay) def plot(): env.render_mode, rmt = 'rgb_array', env.render_mode frame = env.render() env.render_mode = rmt im = Image.fromarray(frame) plt.imshow(im) plt.axis('off') plt.axis('off') plt.tight_layout() def savepdf(file): env.render_mode, rmt = 'rgb_array', env.render_mode frame = env.render() env.render_mode = rmt im = Image.fromarray(frame) snapshot_base = file if snapshot_base.endswith(".png"): sf = snapshot_base[:-4] fext = 'png' else: fext = 'pdf' if snapshot_base.endswith(".pdf"): sf = snapshot_base[:-4] else: sf = snapshot_base sf = f"{sf}.{fext}" dn = os.path.dirname(sf) if len(dn) > 0 and not os.path.isdir(dn): os.makedirs(dn) print("Saving snapshot of environment to", os.path.abspath(sf)) if fext == 'png': im.save(sf) from irlc import _move_to_output_directory _move_to_output_directory(sf) else: plt.figure(figsize=(16, 16)) plt.imshow(im) plt.axis('off') plt.tight_layout() from irlc import savepdf savepdf(sf, verbose=True) plt.show() env.plot = plot env.savepdf = savepdf return env, agent
def main(): from irlc.ex11.q_agent import QAgent from irlc.gridworld.gridworld_environments import BookGridEnvironment from irlc import train, Agent env = BookGridEnvironment(render_mode="human", zoom=0.8) # Pass render_mode='human' for visualization. env, agent = interactive(env, Agent(env)) # Make th env.reset() # We always need to call reset env.plot() # Plot the environment. env.close() # Interaction with a random agent. from irlc.gridworld.gridworld_environments import BookGridEnvironment from irlc import train, Agent env = BookGridEnvironment(render_mode="human", zoom=0.8) # Pass render_mode='human' for visualization. env, agent = interactive(env, Agent(env)) # Make the environment interactive. Note that it needs an agent. train(env, agent, num_episodes=100) # You can train and use the agent and environment as usual. env.close() if __name__ == "__main__": main()