In [1]:
import numpy as np

In [2]:
np.__version__

'1.26.1'

# Example 1 - Manual implementation

A straightforward way to implement reinforcement learning is to define the environment with a number of states, where each state is assigned a number. In our example, we will apply reinforcement learning to a maze-solving algorithm where each space is assigned a sequential number:

    S..   0 1 2
    ...   3 4 5
    E..   6 7 8

Additionally, we will define 4 possible moves, where each will also have a corresponding number:

    possible moves

        0
      3   1
        2

With this, we can define our problem space using a matrix of all possible transitions between spaces:

In [3]:
maze = [['S', '.', '.'],
        ['.', '.', '.'],
        ['E', '.', '.']]

rewards = np.ones((len(maze)*len(maze[0]), 4))*-1
transitions = np.ones((len(maze)*len(maze[0]), 4))*-1


transitions = transitions.astype(int)



transitions[0][1] = 1
transitions[0][2] = 3
transitions[1][1] = 2
transitions[1][2] = 4
transitions[1][3] = 0
transitions[2][2] = 5
transitions[2][3] = 1
transitions[3][0] = 0
transitions[3][1] = 4
transitions[3][2] = 6
transitions[4][0] = 1
transitions[4][1] = 5
transitions[4][2] = 7
transitions[4][3] = 3
transitions[4][0] = 2
transitions[5][2] = 8
transitions[5][3] = 4
transitions[6][0] = 3
transitions[6][1] = 7
transitions[7][0] = 4
transitions[7][1] = 8
transitions[7][3] = 6
transitions[8][0] = 5
transitions[8][3] = 7





Additionally, we can define the rewards for each move. To keep this simple, we will reward the moves that reach the exit

In [4]:
rewards[3][2] = 10
rewards[7][3] = 10

rewards

array([[-1., -1., -1., -1.],
       [-1., -1., -1., -1.],
       [-1., -1., -1., -1.],
       [-1., -1., 10., -1.],
       [-1., -1., -1., -1.],
       [-1., -1., -1., -1.],
       [-1., -1., -1., -1.],
       [-1., -1., -1., 10.],
       [-1., -1., -1., -1.]])

## Value Iteration

The value iteration method attempts to approximate the optimal value function for each possible state. It scores each state so that states that are more likely to lead to larger rewards get a larger value. To compute this, we need to know the possible state transitions ahead of time. In our case, this is the matrix transitions. Then, we iteratively update the score of each state based on the best possible move we can make from this state.

In [5]:
# Value iteration method
def value_iteration(T, R, gamma = 0.9, iterations = 100):
    #initialize starting state values to 0
    # V - The vector of scores for each state
    V = np.zeros(len(R))
    # a set of actions to iterate through
    A = list(range(len(R[0])))
    # a set of states to iterate through
    S = list(range(len(R)))
  
    # Q - The vector of scores for each [state, action] pair
    # initialize the matrix Q (states X actions)
    # everything should start at 0
    Q = np.zeros((len(R), len(R[0])))
  
    # Iterate trough every state and action multiple times
    for i in range(iterations):
        #for each state "s"
        for s in S:
            #for each action "a"
            for a in A:
                # Update the score for the action "a" from the state "s".
                # It should be equal to the score in the reward matrix R + 
                # some factor gamma times the score of the state we reached
                # with that action. Gamma is the discount factor and should 
                # be set to a number lower than 1. In a maze-solving problem,
                # this ensures that the further away a room is from the exit,
                # the smaller its score will be.
                #print(T)
                if T[s, a] == -1:
                    print(s, a)
                    continue
                Q[s,a] = R[s,a] + gamma*1*V[T[s,a]]
                # Update the scores for the state. The score is equal to the
                # score of the best possible move we can make from that state.
                V[s] = max(Q[s])
    # Return the scores of each state
    return V

In [6]:
value_iteration(transitions, rewards)

0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0
2 0
2 1
3 3
5 0
5 1
6 2
6 3
7 2
8 1
8 2
0 0
0 3
1 0


array([42.10526312, 36.89473681, 32.20526313, 47.89473681, 42.10526313,
       36.89473682, 42.10526313, 47.89473682, 42.10526313])

## Q-learning

Q-learning is similar to value iteration but does not update all the states in each iteration. Instead, it uses an agent that randomly explores the environment. When it moves from state A to state B it increases or decreases the score of state A based on the reward that move obtained.

In [7]:
# Q-learning
def qlearning(T, R, F, gamma = 0.9):

    # the number of states
    nstates = len(T)
    nactions = len(T[0])
  
    # initialize the matrix Q
    Q = np.zeros((nstates, nactions))
  
    alpha = 1
  
    while alpha > 0.1:
  
        # We want an agent that randomly explores the environment
        # randomly select a starting state
        cur_state = np.random.choice(list(range(nstates)))
    
        # move around until we reach one of the final states
        while cur_state not in F:
    
            # see which actions are possible in the current state
            possible_actions = [x[0] for x in enumerate(T[cur_state]) if x[1] != -1]
      
            # randomly select the next action
            action = np.random.choice(possible_actions)
      
            # the selected action determines the next state
            next_state = T[cur_state, action]
      
            # update Q for the current state and the selected action
            # This is similar to value iteration, with some changes:
            #     - We no longer use the V vector. Instead this is incorporated
            #       directly into the equation.
            #     - We use alpha to ensure later iterations don't chage the Q
            #       matrix as much as earlier iterations
            #     - We subtract Q[cur.state, action] at the end to ensure moving
            #       from a better state to a worse state is penalized even if 
            #       the worse state still has a good score.
            Q[cur_state][action] = Q[cur_state][action] + alpha * (R[cur_state][action] + gamma * np.max(Q[next_state]) - Q[cur_state, action])
      
            # Execute the move
            cur_state = next_state
    
    
        # Lower alpha during every iteration
        alpha = alpha * 0.999
  
  
    # Return the Q matrix, which is normalized so that the scores look nicer.
    return Q / np.max(Q)


In [8]:
qlearning(transitions, rewards, [6])

array([[0.    , 0.458 , 0.8   , 0.    ],
       [0.    , 0.3122, 0.62  , 0.62  ],
       [0.    , 0.    , 0.458 , 0.458 ],
       [0.62  , 0.62  , 1.    , 0.    ],
       [0.3122, 0.458 , 0.8   , 0.8   ],
       [0.    , 0.    , 0.62  , 0.62  ],
       [0.    , 0.    , 0.    , 0.    ],
       [0.62  , 0.62  , 0.    , 1.    ],
       [0.458 , 0.    , 0.    , 0.8   ]])

In [9]:
#!pip install gymnasium

In [10]:
#!pip install pygame

# Example 2 - OpenAI gymnasium library

In the previous example, we defined the problem space as a matrix of all possible state transitions. However, this can be impractical for larger problems. Instead, we can represent as python classes. In this example, we will use the gymnasium library:

    Gymnasium is an open source Python library for developing and comparing reinforcement learning algorithms by   
    providing a standard API to communicate between learning algorithms and environments, as well as a standard set of 
    environments compliant with that API. 
    
The library represents environments using the gym.Env class. Each environment must contain the following main methods:


    step() - Updates an environment with actions returning the next agent observation, the reward for taking that actions, if the environment has terminated or truncated due to the latest action and information from the environment about the step, i.e. metrics, debug info.

    reset() - Resets the environment to an initial state, required before calling step. Returns the first agent observation for an episode and information, i.e. metrics, debug info.

    render() - Renders the environments to help visualise what the agent see, examples modes are “human”, “rgb_array”, “ansi” for text.

    close() - Closes the environment, important when external software is used, i.e. pygame for rendering, databases

Let's see how gymnasium implements a simple maze problem (without walls):

In [11]:
import gymnasium as gym
from gymnasium import spaces
from gymnasium.vector.utils import batch_space
import pygame
import numpy as np

In [12]:
class GridWorldEnv(gym.Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}

    def __init__(self, render_mode=None, size=5):
        self.size = size  # The size of the square grid
        self.window_size = 512  # The size of the PyGame window

        # We have 4 actions, corresponding to "right", "up", "left", "down"
        # We will use action_space to sample moves
        self.action_space = spaces.Discrete(4)

        """
        The following dictionary maps abstract actions from `self.action_space` to
        the direction we will walk in if that action is taken.
        I.e. 0 corresponds to "right", 1 to "up" etc.
        """
        self._action_to_direction = {
            0: np.array([1, 0]),
            1: np.array([0, 1]),
            2: np.array([-1, 0]),
            3: np.array([0, -1]),
        }
        
        
        """
        Gymnasium allows us to display our problem in a human-friendly way.
        If human-rendering is used, `self.window` will be a reference
        to the window that we draw to. `self.clock` will be a clock that is used
        to ensure that the environment is rendered at the correct framerate in
        human-mode. They will remain `None` until human-mode is used for the
        first time.
        """
        self.render_mode = render_mode
        self.window = None
        self.clock = None
        
        """
        Generate the starting and ending locations. We use a generator with a set seed so that
        we always get the same maze.
        """
        self.generator, _ = gym.utils.seeding.np_random(seed=3025)
        
        # Choose the agent's location uniformly at random
        self._starting_agent_location = self.generator.integers(0, self.size, size=2, dtype=int)
        self._agent_location = self._starting_agent_location
        
        # We will sample the target's location randomly until it does not coincide with the agent's location
        self._target_location = self._agent_location
        while np.array_equal(self._target_location, self._agent_location):
            self._target_location = self.generator.integers(
                0, self.size, size=2, dtype=int
            )
            
    # Observations encode the current state of our problem. In this case, we will
    # represent this with the locations of the agent and target
    def _get_obs(self):
        return {"agent": self._agent_location, "target": self._target_location}
    
    # Additionally, we can return useful information. We won't be using this,
    # but we could use the distance to target location as a possible reward/penalty
    def _get_info(self):
        return {
            "distance": np.linalg.norm(
                self._agent_location - self._target_location, ord=1
            )
        }

    """
    reset() initializes the starting state of our game. Here, we move the agent to
    the starting position
    """
    def reset(self, seed=None, options=None):
        # We need the following line to seed self.np_random
        super().reset(seed=seed)

        self._agent_location = self._starting_agent_location

        observation = self._get_obs()
        info = self._get_info()

        if self.render_mode == "human":
            self._render_frame()
        
        # reset() always returns the observation (the state of our game) and 
        # auxillary information
        return observation, info
    
    """
    step() tells us what happens during each step of our game. In our case, we need to
    pick a random direction and move the agent in that direction.
    
    Additionally, step() also calculates the rewards of our action. In this case, we will
    simply reward each step that moves into the target.
    """
    def step(self, action):
        # Map the action (element of {0,1,2,3}) to the direction we walk in
        direction = self._action_to_direction[action]
        
        # We use `np.clip` to make sure we don't leave the grid
        self._agent_location = np.clip(
            self._agent_location + direction, 0, self.size - 1
        )
        
        # An episode is done if the agent has reached the target
        terminated = np.array_equal(self._agent_location, self._target_location)
        reward = 1 if terminated else 0  # Binary sparse rewards
        observation = self._get_obs()
        info = self._get_info()

        if self.render_mode == "human":
            self._render_frame()
        """
        step() returns 5 values:
            * observation: the state of the environment after the step
            * reward: the reward of the step
            * terminated: whether the step ended the game
            * truncated: whether the game ended in an unusual way. In our case, this is always False
            * info: additional auxiliary information. In our case, the distance to the target
        """
        return observation, reward, terminated, False, info
    
    """
    render() is used to display the game state in a human friendly way. In our case, we use
    the pygame library to draw the current state.
    """
    def render(self):
        if self.render_mode == "rgb_array":
            return self._render_frame()

    def _render_frame(self):
        if self.window is None and self.render_mode == "human":
            pygame.init()
            pygame.display.init()
            self.window = pygame.display.set_mode(
                (self.window_size, self.window_size)
            )
        if self.clock is None and self.render_mode == "human":
            self.clock = pygame.time.Clock()

        canvas = pygame.Surface((self.window_size, self.window_size))
        canvas.fill((255, 255, 255))
        pix_square_size = (
            self.window_size / self.size
        )  # The size of a single grid square in pixels

        # First we draw the target
        pygame.draw.rect(
            canvas,
            (255, 0, 0),
            pygame.Rect(
                pix_square_size * self._target_location,
                (pix_square_size, pix_square_size),
            ),
        )
        # Now we draw the agent
        pygame.draw.circle(
            canvas,
            (0, 0, 255),
            (self._agent_location + 0.5) * pix_square_size,
            pix_square_size / 3,
        )

        # Finally, add some gridlines
        for x in range(self.size + 1):
            pygame.draw.line(
                canvas,
                0,
                (0, pix_square_size * x),
                (self.window_size, pix_square_size * x),
                width=3,
            )
            pygame.draw.line(
                canvas,
                0,
                (pix_square_size * x, 0),
                (pix_square_size * x, self.window_size),
                width=3,
            )

        if self.render_mode == "human":
            # The following line copies our drawings from `canvas` to the visible window
            self.window.blit(canvas, canvas.get_rect())
            pygame.event.pump()
            pygame.display.update()

            # We need to ensure that human-rendering occurs at the predefined framerate.
            # The following line will automatically add a delay to keep the framerate stable.
            self.clock.tick(self.metadata["render_fps"])
        else:  # rgb_array
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)
            )
        
    """
    close() cleans up the environment. In our case we simply close the pygame window
    """
    def close(self):
        if self.window is not None:
            pygame.display.quit()
            pygame.quit()

To run our environment we first create the class and call reset() to generate a new game.

We then generate each move using env.action_space.sample() and apply the move using env.step(action). If the game has ended, we call reset() again.

In [13]:
env = GridWorldEnv(render_mode="human")
observation, info = env.reset()

In [14]:
for _ in range(1000):
    action = env.action_space.sample()  # agent policy that uses the observation and info
    observation, reward, terminated, truncated, info = env.step(action)

    if terminated or truncated:
        observation, info = env.reset()

KeyboardInterrupt: 

In [15]:
env.close()

## Training the agent

At this point, every action in our game is entirely random (we generate them using env.action_space.sample()). In order to train our agent, we again need to implement some version of reinforcement learning. We will do this by creating a new class (MazeAgent) that will interact with the  GridWorldEnv environment. This class will implement q-learning using two key methods:

    get_actions(obs): Instead of randomly sampling actions, get_actions will return the optimal action based on the learned policy
    
    update(obs, action, reward, terminated, next_obs): This function will update the Q-value of a given action (i.e., transition from obs -> next_obs) based on the reward obtained by the action
    
We will also no longer save the Q-values into a matrix. Instead, we will use the defaultdict class which acts as a normal dictionary when given a valid key or returns a 0 if the key is not in a dictionary. We will use this to save the computed q-values while returning 0 for actions without an existing q-value.

In [None]:
from collections import defaultdict

class MazeAgent:
        def __init__(self, 
                     learning_rate=0.01,
                     initial_epsilon=1.0,
                     epsilon_decay=1.0 / (10000 / 2),
                     final_epsilon=0.1,
                     discount_factor=0.95):

            """
            Initialize a Reinforcement Learning agent with an empty dictionary
            of state-action values (q_values), a learning rate and an epsilon.
            
            

            Args:
                learning_rate: The learning rate
                initial_epsilon: The initial epsilon value
                epsilon_decay: The decay for epsilon
                final_epsilon: The final epsilon value
                discount_factor: The discount factor for computing the Q-value
            """
            self.q_values = defaultdict(lambda: np.zeros(env.action_space.n))

            self.lr = learning_rate
            self.discount_factor = discount_factor

            self.epsilon = initial_epsilon
            self.epsilon_decay = epsilon_decay
            self.final_epsilon = final_epsilon

            self.training_error = []
    
        def get_action(self, obs):
            """
            Returns the best action with probability (1 - epsilon)
            otherwise a random action with probability epsilon to ensure exploration.
            """
            # with probability epsilon return a random action to explore the environment
            if np.random.random() < self.epsilon:
                return env.action_space.sample()

            # with probability (1 - epsilon) act greedily (exploit)
            # Look at the q_values for a given observation and return the best action
            else:
                return int(np.argmax(self.q_values[str(obs)]))

        def update( self,
                    obs,
                    action,
                    reward,
                    terminated,
                    next_obs):
            """Updates the Q-value of an action usint the q-learning equation."""
            
            #future_q_value = (not terminated) * np.max(self.q_values[str(next_obs)])
            future_q_value = np.max(self.q_values[str(next_obs)])
            #print(not terminated, future_q_value, np.max(self.q_values[str(next_obs)]), self.epsilon)
            temporal_difference = (
                reward + self.discount_factor * future_q_value - self.q_values[str(obs)][action]
            )

            self.q_values[str(obs)][action] = (
                self.q_values[str(obs)][action] + self.lr * temporal_difference
            )
            self.training_error.append(temporal_difference)
            
            
        def decay_epsilon(self):
            self.epsilon = max(self.final_epsilon, self.epsilon - epsilon_decay)


Now we can train our agent. First, initialize the MazeAgent object:

In [None]:
learning_rate = 0.01
n_episodes = 1000
start_epsilon = 1.0
epsilon_decay = start_epsilon / (n_episodes / 2)  # reduce the exploration over time
final_epsilon = 0.1


agent = MazeAgent(
    learning_rate=learning_rate,
    initial_epsilon=start_epsilon,
    epsilon_decay=epsilon_decay,
    final_epsilon=final_epsilon,
)

Then train the agent:

In [None]:
!pip install tqdm

In [None]:
# tqdm adds a nice progress bar to any loop
# unnecessary, but nice to have
from tqdm import tqdm

env = GridWorldEnv()
#env.render_mode="human"
for episode in tqdm(range(n_episodes)):
    obs, info = env.reset()
    done = False

    # play one episode
    while not done:
        #print(obs)
        action = agent.get_action(obs)
        next_obs, reward, terminated, truncated, info = env.step(action)

        # update the agent
        agent.update(obs, action, reward, terminated, next_obs)

        # update if the environment is done and the current obs
        done = terminated or truncated
        obs = next_obs

    agent.decay_epsilon()
#print(agent.q_values["{'agent': array([1, 2]), 'target': array([4, 3])}"])

Let's check if the training worked. We will generate the actions using the trained agent:

In [None]:
env.render_mode="human"

obs, info = env.reset()
for _ in range(1000):
    action = agent.get_action(obs) 
    obs, reward, terminated, truncated, info = env.step(action)

    if terminated or truncated:
        obs, info = env.reset()

In [None]:
env.close()

## Extending the enviromnent

Using the gymnasium library, we can easily extend environments with additional features. For example, let's add walls to our environment. We will need to:

* Update the init() function to generate walls
* Update the step() function to penalize the agent running into walls
* Update the _render_frame() function to draw walls

In [None]:
class GridWorldWallsEnv(GridWorldEnv):
    def __init__(self, render_mode=None, size=5, num_walls = 1):
        super().__init__(render_mode, size)
        self.num_walls = num_walls
        
        # We now also generate walls by randomly sampling until we get empty locations
        self._wall_locations = []
        for i in range(self.num_walls):
            wall_location = self.generator.integers(0, self.size, size=2, dtype=int)
            while (np.array_equal(wall_location, self._agent_location) 
                   or np.array_equal(wall_location, self._target_location)
                   or any([np.array_equal(wall_location, x) for x in self._wall_locations])):
                    wall_location = self.generator.integers(0, self.size, size=2, dtype=int)
                    
            self._wall_locations.append(wall_location)
        
    def step(self, action):
       
        # Map the action (element of {0,1,2,3}) to the direction we walk in
        direction = self._action_to_direction[action]
        # We use `np.clip` to make sure we don't leave the grid
        self._agent_location = np.clip(
            self._agent_location + direction, 0, self.size - 1
        )
        # An episode is done iff the agent has reached the target
        terminated = np.array_equal(self._agent_location, self._target_location)
        
        # Additionally, let's end the episode if the agent hits a wall
        terminated_wall = any([np.array_equal(self._agent_location, x) for x in self._wall_locations])
        if terminated:
            reward = 1
        elif terminated_wall:
            reward = -0.1
        else:
            reward = 0
        observation = self._get_obs()
        info = self._get_info()

        if self.render_mode == "human":
            self._render_frame()

        return observation, reward, terminated or terminated_wall, False, info
    
    def _render_frame(self):
        if self.window is None and self.render_mode == "human":
            pygame.init()
            pygame.display.init()
            self.window = pygame.display.set_mode((self.window_size, self.window_size))
        if self.clock is None and self.render_mode == "human":
            self.clock = pygame.time.Clock()

        canvas = pygame.Surface((self.window_size, self.window_size))
        canvas.fill((255, 255, 255))
        pix_square_size = (
            self.window_size / self.size
        )  # The size of a single grid square in pixels

        # First we draw the target
        pygame.draw.rect(
            canvas,
            (255, 0, 0),
            pygame.Rect(
                pix_square_size * self._target_location,
                (pix_square_size, pix_square_size),
            ),
        )
        # Now we draw the agent
        pygame.draw.circle(
            canvas,
            (0, 0, 255),
            (self._agent_location + 0.5) * pix_square_size,
            pix_square_size / 3,
        )
        
        # Now we draw the walls
        for wall_loc in self._wall_locations:
            pygame.draw.rect(
            canvas,
            (0, 0, 255),
            pygame.Rect(
                pix_square_size * wall_loc,
                (pix_square_size, pix_square_size),
            ),
        )
        

        # Finally, add some gridlines
        for x in range(self.size + 1):
            pygame.draw.line(
                canvas,
                0,
                (0, pix_square_size * x),
                (self.window_size, pix_square_size * x),
                width=3,)
            pygame.draw.line(
                canvas,
                0,
                (pix_square_size * x, 0),
                (pix_square_size * x, self.window_size),
                width=3,)

        if self.render_mode == "human":
            # The following line copies our drawings from `canvas` to the visible window
            self.window.blit(canvas, canvas.get_rect())
            pygame.event.pump()
            pygame.display.update()

            # We need to ensure that human-rendering occurs at the predefined framerate.
            # The following line will automatically add a delay to keep the framerate stable.
            self.clock.tick(self.metadata["render_fps"])
        else:  # rgb_array
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2))
        

In [None]:
env = GridWorldWallsEnv(render_mode="human", num_walls = 12)
observation, info = env.reset()

In [None]:
for _ in range(1000):
    action = env.action_space.sample()  # agent policy that uses the observation and info
    observation, reward, terminated, truncated, info = env.step(action)

    if terminated or truncated:
        observation, info = env.reset()

Since we're still using the gymnasium environment class, we can reuse the previous RL agent:

In [None]:
learning_rate = 0.01
n_episodes = 10000
start_epsilon = 1.0
epsilon_decay = start_epsilon / (n_episodes / 2)  # reduce the exploration over time
final_epsilon = 0.1


agent = MazeAgent(
    learning_rate=learning_rate,
    initial_epsilon=start_epsilon,
    epsilon_decay=epsilon_decay,
    final_epsilon=final_epsilon,
)

env = GridWorldWallsEnv(num_walls = 12)

for episode in tqdm(range(n_episodes)):
    obs, info = env.reset()
    done = False
    while not done:
        action = agent.get_action(obs)
        next_obs, reward, terminated, truncated, info = env.step(action)
        agent.update(obs, action, reward, terminated, next_obs)
        done = terminated or truncated
        obs = next_obs
    agent.decay_epsilon()


In [None]:
env.render_mode="human"
obs, info = env.reset()
for _ in range(1000):
    action = agent.get_action(obs) 
    obs, reward, terminated, truncated, info = env.step(action)

    if terminated or truncated:
        obs, info = env.reset()

## Additional environments

We can also apply our Q-learning class to other environments. Since it only requires the actions and rewards provided by the environment class, we can apply it to any gymnasium environment without modifications.

For example, let's see if it works on the built-in *LunarLander* environment: a physics-based game where the objective is to land a spacecraft on the moon. This environment is more complex than the maze but works on the same overall principle: for each step we give it an action which returns the next state + reward. The class is very well described in the documentation: https://github.com/Farama-Foundation/Gymnasium/blob/main/gymnasium/envs/box2d/lunar_lander.py

In [None]:
#!pip install gymnasium[box2d]

Let's see how it works without training

In [None]:
env = gym.make("LunarLander-v2", render_mode="human")
observation, info = env.reset()

for _ in range(1000):
    action = env.action_space.sample()  # agent policy that uses the observation and info
    observation, reward, terminated, truncated, info = env.step(action)

    if terminated or truncated:
        observation, info = env.reset()

env.close()

It seems to crash pretty quickly. Let's see if q-learning can improve this. This is the exact same code we used for the maze.

In [None]:
#env = gym.make("LunarLander-v2", render_mode=None)

learning_rate = 0.01
n_episodes = 20000
start_epsilon = 1.0
epsilon_decay = start_epsilon / (n_episodes / 2)  # reduce the exploration over time
final_epsilon = 0.1

"""
agent = MazeAgent(
    learning_rate=learning_rate,
    initial_epsilon=start_epsilon,
    epsilon_decay=epsilon_decay,
    final_epsilon=final_epsilon,
)
"""

env.unwrapped.render_mode = None

for episode in tqdm(range(n_episodes)):
    obs, info = env.reset()
    done = False

    # play one episode
    while not done:
        #print(obs)
        action = agent.get_action(obs)
        next_obs, reward, terminated, truncated, info = env.step(action)

        # update the agent
        agent.update(obs, action, reward, terminated, next_obs)

        # update if the environment is done and the current obs
        done = terminated or truncated
        obs = next_obs

    agent.decay_epsilon()

In [None]:
observation, info = env.reset()
env.unwrapped.render_mode = "human"
for _ in range(1000):
    action = env.action_space.sample()  # agent policy that uses the observation and info
    observation, reward, terminated, truncated, info = env.step(action)
    image = env.render()
    #print(image)

    if terminated or truncated:
        observation, info = env.reset()

In [None]:
env.close()