# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online).
"""
from math import floor
from gymnasium.spaces.box import Box
import numpy as np
from irlc.ex09.rl_agent import _masked_actions
from irlc.utils.common import defaultdict2
[docs]
class FeatureEncoder:
"""
The idea behind linear function approximation of :math:`Q`-values is that
- We initialize (and eventually learn) a :math:`d`-dimensional weight vector :math:`w \in \mathbb{R}^d`
- We assume there exists a function to compute a :math:`d`-dimensional feature vector :math:`x(s,a) \in \mathbb{R}^d`
- The :math:`Q`-values are then represented as
.. math::
Q(s,a) = x(s,a)^\\top w
Learning is therefore entirely about updating :math:`w`.
The following example shows how you initialize the linear :math:`Q`-values and compute them in a given state:
.. runblock:: pycon
>>> import gymnasium as gym
>>> from irlc.ex11.feature_encoder import LinearQEncoder
>>> env = gym.make('MountainCar-v0')
>>> Q = LinearQEncoder(env, tilings=8)
>>> s, _ = env.reset()
>>> a = env.action_space.sample()
>>> Q(s,a) # Compute a Q-value.
>>> Q.d # Get the number of dimensions
>>> Q.x(s,a)[:4] # Get the first four coordinates of the x-vector
>>> Q.w[:4] # Get the first four coordinates of the w-vector
"""
[docs]
def __init__(self, env):
"""
Initialize the feature encoder. It requires an environment to know the number of actions and dimension of the state space.
:param env: An openai Gym ``Env``.
"""
self.env = env
self.w = np.zeros((self.d, ))
self._known_masks = {}
def q_default(s):
from irlc.utils.common import DiscreteTextActionSpace
if s in self._known_masks:
return {a: 0 for a in range(self.env.action_space.n) if
self._known_masks[s][(a - self.env.action_space.start) if not isinstance(self.env.action_space, DiscreteTextActionSpace) else a] == 1}
else:
return {a: 0 for a in range(self.env.action_space.n)}
# qfun = lambda s: OrderedDict({a: 0 for a in (env.P[s] if hasattr(env, 'P') else range(env.action_space.n))})
self.q_ = defaultdict2(lambda s: q_default(s))
@property
def d(self):
""" Get the number of dimensions of :math:`w`
.. runblock:: pycon
>>> import gymnasium as gym
>>> from irlc.ex11.feature_encoder import LinearQEncoder
>>> env = gym.make('MountainCar-v0')
>>> Q = LinearQEncoder(env, tilings=8) # as in (SB18)
>>> Q.d
"""
raise NotImplementedError()
[docs]
def x(self, s, a):
"""
Computes the :math:`d`-dimensional feature vector :math:`x(s,a)`
.. runblock:: pycon
>>> import gymnasium as gym
>>> from irlc.ex11.feature_encoder import LinearQEncoder
>>> env = gym.make('MountainCar-v0')
>>> Q = LinearQEncoder(env, tilings=8) # as in (SB18)
>>> s, info = env.reset()
>>> x = Q.x(s, env.action_space.sample())
:param s: A state :math:`s`
:param a: An action :math:`a`
:return: Feature vector :math:`x(s,a)`
"""
raise NotImplementedError()
[docs]
def get_Qs(self, state, info_s=None):
"""
This is a helper function, it is only for internal use.
:param state:
:param info_s:
:return:
"""
if info_s is not None and 'mask' in info_s and not isinstance(state, np.ndarray):
if state not in self._known_masks:
self._known_masks[state] = info_s['mask']
# Probably a good idea to check the Q-values are okay...
avail_actions = _masked_actions(self.env.action_space, info_s['mask'])
self.q_[state] = {a: self.q_[state][a] for a in avail_actions}
# raise Exception()
# from irlc.utils.common import ExplicitActionSpace
#
# zip(*self.q_[state].items())
from irlc.pacman.pacman_environment import PacmanEnvironment
from irlc.pacman.pacman_utils import Actions
if isinstance(state, np.ndarray):
actions = tuple(range(self.env.action_space.n))
elif isinstance(self.env, PacmanEnvironment):
# actions = Actions
# actions = tuple(Actions._directions.keys())
actions = _masked_actions(self.env.action_space, info_s['mask'])
actions = tuple([self.env.action_space.actions[n] for n in actions])
else:
actions = tuple(self.q_[state].keys())
# if isinstance(self.env, PacmanEnvironment):
# # TODO: Make smarter masking.
# actions = [a for a in actions if a in self.env.A(state)]
# actions =
Qs = tuple([self(state,a) for a in actions])
# TODO: Implement masking and masking-cache.
return actions, Qs
#
# actions = list( self.env.P[state].keys() if hasattr(self.env, 'P') else range(self.env.action_space.n) )
# Qs = [self(state, a) for a in actions]
# return tuple(actions), tuple(Qs)
[docs]
def get_optimal_action(self, state, info=None):
"""
For a given state ``state``, this function returns the optimal action for that state.
.. math::
a^* = \\arg\\max_a Q(s,a)
An example:
.. runblock:: pycon
>>> from irlc.ex09.rl_agent import TabularAgent
>>> class MyAgent(TabularAgent):
... def pi(self, s, k, info=None):
... a_star = self.Q.get_optimal_action(s, info)
:param state: State to find the optimal action in :math:`s`
:param info: The ``info``-dictionary corresponding to this state
:return: The optimal action according to the Q-values :math:`a^*`
"""
actions, Qa = self.get_Qs(state, info)
if len(actions) == 0:
print("Bad actions list")
a_ = np.argmax(np.asarray(Qa) + np.random.rand(len(Qa)) * 1e-8)
return actions[a_]
def __call__(self, s, a):
"""
Evaluate the Q-values for the given state and action. An example:
.. runblock:: pycon
>>> import gymnasium as gym
>>> from irlc.ex11.feature_encoder import LinearQEncoder
>>> env = gym.make('MountainCar-v0')
>>> Q = LinearQEncoder(env, tilings=8) # as in (SB18)
>>> s, info = env.reset()
>>> Q(s, env.action_space.sample()). # Compute Q(s,a)
:param s: A state :math:`s`
:param a: An action :math:`a`
:return: Feature vector :math:`x(s,a)`
"""
return self.x(s, a) @ self.w
def __getitem__(self, item):
raise Exception("Hi! You tried to access linear Q-values as Q[s,a]. You need to use Q(s,a). This choice signifies they are not represented as a table, but as a linear combination x(s,a)^T w")
# s,a = item
# return self.__call__(s, a)
def __setitem__(self, key, value):
raise Exception("Oy! You tried to set a linearly encoded Q-value as in Q[s, a] = new_q_value.\n This is not possible since they are represented as x(s,a)^T w. Rewrite the expression to update Q.w.")
class DirectEncoder(FeatureEncoder):
def __init__(self, env):
self.d_ = np.prod( env.observation_space.shape ) * env.action_space.n
# self.d_ = len(self.x(env.reset(), env.action_space.n))
super().__init__(env)
def x(self, s, a):
xx = np.zeros( (self.d,))
n = s.size
xx[n * a:n*(a+1) ] = s
return xx
ospace = self.env.observation_space.shape
simple = False
if not isinstance(ospace, tuple):
ospace = (ospace,)
simple = True
sz = []
for j, disc in enumerate(ospace):
sz.append(disc.n)
total_size = sum(sz)
csum = np.cumsum(sz, ) - sz[0]
self.max_size = total_size * self.env.action_space.n
def fixed_sparse_representation(s, action):
if simple:
s = (s,)
s_encoded = [cs + ds + total_size * action for ds, cs in zip(s, csum)]
return s_encoded
self.get_active_tiles = fixed_sparse_representation
# super().__init__(env)
@property
def d(self):
return self.d_
return 10000*8
x = np.zeros(self.d)
at = self.get_active_tiles(s, a)
x[at] = 1.0
return x
class GridworldXYEncoder(FeatureEncoder):
def __init__(self, env):
self.env = env
self.na = self.env.action_space.n
self.ns = 2
super().__init__(env)
@property
def d(self):
return self.na*self.ns
def x(self, s, a):
x,y = s
xx = [np.zeros(self.ns) for _ in range(self.na)]
xx[a][0] = x
xx[a][1] = y
# return xx[a]
xx = np.concatenate(xx)
return xx
class SimplePacmanExtractor(FeatureEncoder):
def __init__(self, env):
self.env = env
from irlc.pacman.feature_extractor import SimpleExtractor
# from reinforcement.featureExtractors import SimpleExtractor
self._extractor = SimpleExtractor()
self.fields = ["bias", "#-of-ghosts-1-step-away", "#-of-ghosts-1-step-away", "eats-food", "closest-food"]
super().__init__(env)
def x(self, s, a):
xx = np.zeros_like(self.w)
# ap = self.env._actions_gym2pac[a]
ap = a
for k, v in self._extractor.getFeatures(s, ap).items():
xx[self.fields.index(k)] = v
return xx
@property
def d(self):
return len(self.fields)
[docs]
class LinearQEncoder(FeatureEncoder):
[docs]
def __init__(self, env, tilings=8, max_size=2048):
"""
Implements the tile-encoder described by (SB18)
:param env: The openai Gym environment we wish to solve.
:param tilings: Number of tilings (translations). Typically 8.
:param max_size: Maximum number of dimensions.
"""
if isinstance(env.observation_space, Box):
os = env.observation_space
low = os.low
high = os.high
scale = tilings / (high - low)
hash_table = IHT(max_size)
self.max_size = max_size
def tile_representation(s, action):
s_ = list( (s*scale).flat )
active_tiles = tiles(hash_table, tilings, s_, [action]) # (s * scale).tolist()
# if 0 not in active_tiles:
# active_tiles.append(0)
return active_tiles
self.get_active_tiles = tile_representation
else:
# raise Exception("Implement in new class")
#
# Use Fixed Sparse Representation. See:
# https://castlelab.princeton.edu/html/ORF544/Readings/Geramifard%20-%20Tutorial%20on%20linear%20function%20approximations%20for%20dynamic%20programming%20and%20RL.pdf
ospace = env.observation_space
simple = False
if not isinstance(ospace, tuple):
ospace = (ospace,)
simple = True
sz = []
for j,disc in enumerate(ospace):
sz.append( disc.n )
total_size = sum(sz)
csum = np.cumsum(sz,) - sz[0]
self.max_size = total_size * env.action_space.n
def fixed_sparse_representation(s, action):
if simple:
s = (s,)
s_encoded = [cs + ds + total_size * action for ds,cs in zip(s, csum)]
return s_encoded
self.get_active_tiles = fixed_sparse_representation
super().__init__(env)
[docs]
def x(self, s, a):
x = np.zeros(self.d)
at = self.get_active_tiles(s, a)
x[at] = 1.0
return x
@property
def d(self):
return self.max_size
"""
Following code contains the tile-coding utilities copied from:
http://incompleteideas.net/tiles/tiles3.py-remove
"""
class IHT:
"""Structure to handle collisions"""
def __init__(self, size_val):
self.size = size_val
self.overfull_count = 0
self.dictionary = {}
def count(self):
return len(self.dictionary)
def full(self):
return len(self.dictionary) >= self.size
def get_index(self, obj, read_only=False):
d = self.dictionary
if obj in d:
return d[obj]
elif read_only:
return None
size = self.size
count = self.count()
if count >= size:
if self.overfull_count == 0:
print('IHT full, starting to allow collisions')
self.overfull_count += 1
return hash(obj) % self.size
else:
d[obj] = count
return count
def hash_coords(coordinates, m, read_only=False):
if isinstance(m, IHT): return m.get_index(tuple(coordinates), read_only)
if isinstance(m, int): return hash(tuple(coordinates)) % m
if m is None: return coordinates
def tiles(iht_or_size, num_tilings, floats, ints=None, read_only=False):
"""returns num-tilings tile indices corresponding to the floats and ints"""
if ints is None:
ints = []
qfloats = [floor(f * num_tilings) for f in floats]
tiles = []
for tiling in range(num_tilings):
tilingX2 = tiling * 2
coords = [tiling]
b = tiling
for q in qfloats:
coords.append((q + b) // num_tilings)
b += tilingX2
coords.extend(ints)
tiles.append(hash_coords(coords, iht_or_size, read_only))
return tiles