Markov Decision Processes

MDP Components

Objective of MDP

E[t=0TγtR(St,at)]\mathbb{E}\left[ \sum_{t=0}^T \gamma^t R(S_t,a_t) \right]

MDPs - World State

Planning Horizon & Information Availability

Policies

maxπE[t=0γtR(st,π(st))]\max_\pi \mathbb{E} \left[\sum_{t=0} \gamma^t R(s_t, \pi(s_t))\right]

Given that π(t)\pi(t) is the action taken at some time step tt.

Value of a Policy

Exercise 7.1

Consider the gridworld below:
An agent is currently on grid cell $s_2$, as indicated by the star and would like to collect the rewards that lie on both sides of it. If the agent is on a numbered square (0,5 or 10), the instance terminates and the agent receives a reward equal to the number on the square. On any other (non-numbered) square, its available actions are to move Left and Right. Note that Up and Down are never available actions. If the agent is in a square with an adjacent square below it , it does not always move successfully. When the agent is in one of these squares and takes a move action, it will only succeed with probability $p$. With probability $1-p$ the move action will fail and the agent will instead fall downwards into a trap. If the agent is not in a square with an adjacent space below, it will always move successfully.
  1. Consider the policy πR\pi_R which is to always move right when possible. For each state s{s1,s2,s3,s4}s\in\{s_1,s_2,s_3,s_4\} in the diagram above, give the value function VπRV^{\pi_R} in terms of γ[0,1]\gamma\in[0,1] and p[0,1]p \in [0,1]
  2. Consider the policy πL\pi_L which is to always move left when possible. For each state s{s1,s2,s3,s4}s\in\{s_1,s_2,s_3,s_4\} in the diagram above, give the value function VπLV^{\pi_L} in terms of γ[0,1]\gamma\in[0,1] and p[0,1]p \in [0,1]

Exercise 7.1a

We first recall that we're computing the value of the value function at each state, for policy πR\pi_R. Under this policy, the optimal action in each state is to move right. We now recall the definition of a value function of a MDP.

Vπ(s)=sSP(sπ(s),s)[R(s,π(s),s)+γVπ(s)]V^\pi(s)=\sum{s'\in S} P(s'|\pi(s),s)\left[R(s,\pi(s),s')+\gamma V^\pi(s')\right]

From this, we get the result that:

VπR(s4)=p[0+γ10]=10pγProbability of not falling is p Next state is s5 and it has value of 10VπR(s3)=p[0+10pγ2]=10p2γ2Probability of not falling is p Next state is s4 and it has value of 10pγVπR(s2)=1[0+10p2γ3]=10p2γ3Probability of not falling is 1 Next state is s3 and it has value of 10p2γ2VπR(s1)=1[0+10p2γ4]=10p2γ4Probability of not falling is 1 Next state is s2 and it has value of 10p2γ3\begin{align*} V^{\pi_R}(s_4) &= p[0 + \gamma 10] = 10p\gamma & \text{Probability of not falling is }p\text{ Next state is }s_5 \text{ and it has value of } 10\\ V^{\pi_R}(s_3) &= p[0 + 10p\gamma^2] = 10p^2\gamma^2 & \text{Probability of not falling is }p\text{ Next state is }s_4 \text{ and it has value of }10p\gamma\\ V^{\pi_R}(s_2) &= 1[0 + 10p^2\gamma^3] = 10p^2\gamma^3 & \text{Probability of not falling is }1\text{ Next state is }s_3 \text{ and it has value of } 10p^2\gamma^2\\ V^{\pi_R}(s_1) &= 1[0 + 10p^2\gamma^4] = 10p^2\gamma^4 & \text{Probability of not falling is }1\text{ Next state is }s_2 \text{ and it has value of }10p^2\gamma^3\\ \end{align*}

Exercise 7.2a

Implement the attempt_move and get_transition_probabilities methods in the Grid class according to their docstrings. The aim is to have functions that can be used on an arbitrary grid-world (i.e. do not hard-code your function just for this problem instance.)

ACTION_NAMES = ['UP', 'DOWN', 'LEFT', 'RIGHT']
self.states = list((x, y) for x in range(self.x_size) for y in range(self.y_size))
def attempt_move(self, s, a):
    x, y = s
    # Check absorbing state
    if s == EXIT_STATE:
        return s
    if s in self.rewards.keys():
        return EXIT_STATE
    # Default: no movement
    result = s
    if a == LEFT and x > 0:
        result = (x - 1, y)
    if a == RIGHT and x < self.x_size - 1:
        result = (x + 1, y)
    if a == UP and y < self.y_size - 1:
        result = (x, y + 1)
    if a == DOWN and y > 0:
        result = (x, y - 1)
    # Check obstacle cells
    if result in OBSTACLES:
        return s
    return result
def stoch_action(self, a):
    """ Returns the probabilities with which each action will actually occur,
    given that action a was requested.
    Parameters:
    a: The action requested by the agent.
    Returns:
    The probability distribution over actual actions that may occur.
    """
    if a == RIGHT:
        return {RIGHT: self.p , UP: (1-self.p)/2, DOWN: (1-self.p)/2}
    elif a == UP:
        return {UP: self.p , LEFT: (1-self.p)/2, RIGHT: (1-self.p)/2}
    elif a == LEFT:
        return {LEFT: self.p , UP: (1-self.p)/2, DOWN: (1-self.p)/2}
    return {DOWN: self.p , LEFT: (1-self.p)/2, RIGHT: (1-self.p)/2}
def get_transition_probabilities(self, s, a):
    probabilities = {}
    for action, probability in self.stoch_action(a).items():
        next_state = self.attempt_move(s, action)
        probabilities[next_state] = probabilities.get(next_state, 0) + probability
    return probabilities

Exercise 7.2b

What is the one-step probability of arriving in each state ss' when starting from [0,0] for each aa (i.e. what is P(sa,[0,0])a,sP(s'|a,[0,0])\forall a,s')

UP = 0 DOWN = 1 LEFT = 2 RIGHT = 3
Action LEFT
{2: 0.8, 0: 0.09999999999999998, 1: 0.09999999999999998}
{(0, 0): 0.9, (0, 1): 0.09999999999999998}
Action RIGHT
{3: 0.8, 0: 0.09999999999999998, 1: 0.09999999999999998}
{(1, 0): 0.8, (0, 1): 0.09999999999999998, (0, 0): 0.09999999999999998}
Action UP
{0: 0.8, 2: 0.09999999999999998, 3: 0.09999999999999998}
{(0, 1): 0.8, (0, 0): 0.09999999999999998, (1, 0): 0.09999999999999998}
Action DOWN
{1: 0.8, 2: 0.09999999999999998, 3: 0.09999999999999998}
{(0, 0): 0.9, (1, 0): 0.09999999999999998}

P((1, 0)|right,(0, 0)) = 0.8
P((0, 1)|right,(0, 0)) = P((0, 0)|right,(0, 0)) = 0.1
P((0, 1)|up,(0, 0)) = 0.8
P((1, 0)|up,(0, 0)) = P((0, 0)|up,(0, 0)) = 0.1
P((0, 0)|left,(0, 0)) = 0.9
P((0, 1)|left,(0, 0)) = 0.1
P((0, 0)|down,(0, 0)) = 0.9
P((1, 0)|down,(0, 0)) = 0.1

Exercise 7.2c

What is the one-step probability of arriving in state [1,0][1,0] from each state ss after taking action aa. That is, what is P([1,0]a,s)(a,s)P([1,0]|a,s)\forall(a,s)

We can once again call the get_transition_probabilities function, iterating over all possible state and action pairs. From this, we get the following result:

P((1, 0)|right,(0, 0)) = 0.8
P((1, 0)|up,(0, 0)) = 0.1
P((1, 0)|down,(0, 0)) = 0.1
P((1, 0)|left, [2, 0]) = 0.8
P((1, 0)|up, [2, 0]) = 0.1
P((1, 0)|down, [2, 0]) = 0.1
P((1, 0)|up,(1, 0)) = 0.8
P((1, 0)|down,(1, 0)) = 0.8
P((1, 0)|left,(1, 0)) = 0.2
P((1, 0)|right,(1, 0)) = 0.2

Exercise 7.3

Implement VI for this problem, using the equation

V[s]maxasP(ss,a)(R(s,a,s)+γV[s])V[s]\leftarrow \max_a \sum_{s'} P(s'|s,a)\left(R(s,a,s')+\gamma V[s']\right)

Note that R(s,a,s)=R(s)R(s,a,s')=R(s) is the reward for landing on a square, which is non-zero for only the red and blue squares located at [3,1][3,1] and [3,2][3,2] respectively.

a) What is the value function estimate after 4 iterations? What is the policy according to the value function estimate after 4 iterations? b) What is the value function estimate after 10 iterations? What is the policy according to the value function estimate after 10 iterations?

The equation given below is a version of the Bellman update equation for Value Iteration.

V[s]maxasP(ss,a)(R(s,a,s)+γV[s])V[s]\leftarrow \max_a \sum_{s'} P(s'|s,a)\left(R(s,a,s')+\gamma V[s']\right)

To implement value iteration, we want to compute sP(sa,s)[R(s,a,s)+γV[s]]\sum_{s'} P(s'|a,s) \left[R(s,a,s')+\gamma V[s']\right] for each possible next state, ss'.

We first define some functions and constants that will help with the implementation of Value Iteration. The dict_argmax function will be used as the mathematical operation maxamax_a.

EPSILON = 0.0001

def dict_argmax(d):
    max_value = max(d.values())
    for k, v in d.items():
        if v == max_value:
            return k

We then create a class containing all of the information we want to keep track of in our VI implementation

class ValueIteration:
    def __init__(self, grid):
        self.grid = grid
        self.values = {state: 0 for state in self.grid.states}
        self.policy = {state: RIGHT for state in self.grid.states}
        self.converged = False
        self.differences = []

    def next_iteration(self):
        new_values = dict()
        new_policy = dict()

        for s in self.grid.states:
            # Keep track of maximum value
            action_values = dict()
            for a in self.grid.actions:
                total = 0
                for stoch_action, p in self.grid.stoch_action(a).items():
                    # Apply action
                    s_next = self.grid.attempt_move(s, stoch_action)
                    total += p * (self.grid.get_reward(s)
                                + (self.grid.discount * self.values[s_next]))
                    action_values[a] = total
            # Update state value with best action
            new_values[s] = max(action_values.values())
            new_policy[s] = dict_argmax(action_values)
        # Check convergence
        differences = [abs(self.values[s] - new_values[s]) for s in self.grid.states]
        max_diff = max(differences)
        self.differences.append(max_diff)

        if max_diff < EPSILON:
            self.converged = True

        # Update values
        self.values = new_values
        self.policy = new_policy

We can then write some code to determine the policy after each iteration

def run_vi():
    grid = Grid()
    vi = ValueIteration(grid)

    start = time.time()
    print("Initial values:")
    vi.print_values()
    print()

    for i in range(MAX_ITER):
        vi.next_iteration()
        print("Values after iteration", i + 1)
        vi.print_values_and_policy()
        print()
        if vi.converged:
            break

    end = time.time()
    print("Time to complete", i + 1, "VI iterations")
    print(end - start)

From this, we get the policy after 4 iterations:

Values and Policies after iteration 4
(0, 0) UP 0.0
(0, 1) UP 0.0
(0, 2) RIGHT 0.3732480000000001
(1, 0) UP 0.0
(1, 2) RIGHT 0.6583680000000002
(2, 0) UP 0.046656
(2, 1) LEFT 0.11728799999999999
(2, 2) RIGHT 0.7964640000000001
(3, 0) DOWN 0.0
(3, 1) UP -100.0
(3, 2) UP 1.0
(-1, -1) UP 0.0
Converged: False

And after 10 iterations

Values and Policies after iteration 10
(0, 0) UP 0.4490637007006404
(0, 1) UP 0.5362371998424762
(0, 2) RIGHT 0.61632756154903
(1, 0) LEFT 0.3679911227699528
(1, 2) RIGHT 0.7155133495934718
(2, 0) LEFT 0.28052219829783076
(2, 1) LEFT 0.28600606577514903
(2, 2) RIGHT 0.8174373191274608
(3, 0) DOWN 0.05225467158005328
(3, 1) UP -100.0
(3, 2) UP 1.0
(-1, -1) UP 0.0
Converged: False

And after 40 iterations

Values and Policies after iteration 40
(0, 0) UP 0.4800323382261456
(0, 1) UP 0.5540265799556026
(0, 2) RIGHT 0.6309786313152921
(1, 0) LEFT 0.42148665011938496
(1, 2) RIGHT 0.728236805418173
(2, 0) LEFT 0.37165369571437096
(2, 1) LEFT 0.38600516990982364
(2, 2) RIGHT 0.8293834149435776
(3, 0) DOWN 0.17564736007905382
(3, 1) UP -100.0
(3, 2) UP 1.0
(-1, -1) UP 0.0
Converged: True
From this, we can see that as the iteration number increases, the changes between the policy values decreases (it converges toward a value). These values propagate outwards from the states in which the agent receives a reward.

7.4

How does Policy Iteration work?

Very broadly, the pseudocode for Policy Iteration is given as:

while not converged:  # Convergence criteria - (new_policy == policy)
    # policy_evaluation uses linear algebra to derive a set of values
    # as shown above
    values <- policy_evaluation(policy)
    # compute a new policy using the previously computed set of values
    # Looks like value iteration, but only a single iteration is performed.
    new_policy <- policy_improvement(values)
    if new_policy == policy:
        # Convergence criteria matched, return the policy
        return policy
    # Convergence criteria not matched, set the policy and perform
    # another iteration
    policy <- new_policy

To implement Policy Iteration, we begin by creating a class to store the required values, as with Value Iteration

class PolicyIteration:
    def __init__(self, grid):
        self.grid = grid
        self.values = {state: 0 for state in self.grid.states}
        self.policy = {pi: RIGHT for pi in self.grid.states}
        self.r = [0 for s in self.grid.states]
        self.USE_LIN_ALG = False
        self.converged = False

        # Full transition matrix (P) of dimensionality |S|x|A|x|S| since its
        # not specific to any one policy. We'll slice out a |S|x|S| matrix
        # from it for each policy evaluation
        # t model (lin alg)
        self.t_model = np.zeros([len(self.grid.states), len(self.grid.actions),
                                    len(self.grid.states)])
        for i, s in enumerate(self.grid.states):
            for j, a in enumerate(self.grid.actions):
                transitions = self.grid.get_transition_probabilities(s, a)
                for next_state, prob in transitions.items():
                    self.t_model[i][j][self.grid.states.index(next_state)] = prob

        # Reward vector
        r_model = np.zeros([len(self.grid.states)])
        for i, s in enumerate(self.grid.states):
            r_model[i] = self.grid.get_reward(s)
        self.r_model = r_model

        # lin alg policy
        la_policy = np.zeros([len(self.grid.states)], dtype=np.int64)
        for i, s in enumerate(self.grid.states):
            la_policy[i] = 3 # Allocate arbitrary initial policy
        self.la_policy = la_policy


    def next_iteration(self):
        new_values = dict()
        new_policy = dict()

        self.policy_evaluation()
        new_policy = self.policy_improvement()
        self.convergence_check(new_policy)

    def policy_evaluation(self):
        if not self.USE_LIN_ALG:
            # use 'naive'/iterative policy evaluation
            value_converged = False
            while not value_converged:
                new_values = dict()
                for s in self.grid.states:
                    total = 0
                    for stoch_action, p in self.grid.stoch_action(self.policy[s]).items():
                        # Apply action
                        s_next = self.grid.attempt_move(s, stoch_action)
                        total += p * (self.grid.get_reward(s) + (self.grid.discount * self.values[s_next]))
                    # Update state value with best action
                    new_values[s] = total

                # Check convergence
                differences = [abs(self.values[s] - new_values[s]) for s in self.grid.states]
                if max(differences) < EPSILON:
                    value_converged = True
                # Update values and policy
                self.values = new_values
        else:
            # use linear algebra for policy evaluation
            # V^pi = R + gamma T^pi V^pi
            # (I - gamma * T^pi) V^pi = R
            # Ax = b; A = (I - gamma * T^pi), b = R
            state_numbers = np.array(range(len(self.grid.states))) # indices of every state
            t_pi = self.t_model[state_numbers, self.la_policy]
            values = np.linalg.solve(np.identity(len(self.grid.states)) - (self.grid.discount * t_pi), self.r_model)
            self.values = {s: values[i] for i, s in enumerate(self.grid.states)}

    def policy_improvement(self):
        if self.USE_LIN_ALG:
            new_policy = {s: self.grid.actions[self.la_policy[i]] for i, s in enumerate(self.grid.states)}
        else:
            new_policy = {}

            for s in self.grid.states:
                # Keep track of maximum value
                action_values = dict()
                for a in self.grid.actions:
                    total = 0
                    for stoch_action, p in self.grid.stoch_action(a).items():
                        # Apply action
                        s_next = self.grid.attempt_move(s, stoch_action)
                        total += p * (self.grid.get_reward(s) + (self.grid.discount * self.values[s_next]))
                    action_values[a] = total
                # Update policy
                new_policy[s] = dict_argmax(action_values)
        return new_policy

    def convergence_check(self, new_policy):
        if new_policy == self.policy:
            self.converged = True
        self.policy = new_policy
        if self.USE_LIN_ALG:
            for i, s in enumerate(self.grid.states):
                self.la_policy[i] = self.policy[s]

The initial policy values affects how many iterations it takes to reach convergence.

Exercise 7.4b

Let PπRS×SP^\pi\in R^{|S|\times|S|} be a matrix containing probabilities for each transition under some policy π\pi, where

Pijπ=P(st+1=jst=i,at=π(st))P^\pi_{ij}=P(s_{t+1}=j|s_t=i,a_t=\pi(s_t))

Calculate the size of PπP^\pi in this grid world, where the special pseudo state is included.

Excluding the obstacle at (1,1) as an unreachable state, but include the exit state, then the size of PπP^\pi is 12x12. Including the obstacle brings the size to 13x13 but the transition probability to (1,1) is 0 from all states

Exercise 7.4c

Set the policy to move right everywhere, π(s)=RIGHTsS\pi(s)=\text{RIGHT} \forall s \in S. Calculate the row of pRIGHTp^\text{RIGHT} corresponding to the initial state at the bottom left corner, (0,0) of the gridworld.

Let the state indices be ordered:

((0,0),(1,0),(2,0),(3,0),(0,1),(2,1),(3,1),(0,2),(1,2),(2,2),(3,2),EXIT_STATE)((0,0),(1,0),(2,0),(3,0),(0,1),(2,1),(3,1),(0,2),(1,2),(2,2),(3,2),\\\text{EXIT\_STATE})

The row is P((0,0),j)RIGHT=[0.1  0.8  0  0  0.1  0  0  0  0  0  0  0]P^{\text{RIGHT}}_{((0,0),j)}=[0.1\ \ 0.8\ \ 0\ \ 0\ \ 0.1\ \ 0\ \ 0\ \ 0\ \ 0\ \ 0\ \ 0\ \ 0]

i.e. The probability of starting in state (0,0) and moving to state j==s' when performing the action RIGHT has the probability ___.

P((0,0)(0,0),RIGHT)=0.1P((0,0)|(0,0), \text{RIGHT})=0.1

The probability of starting in state (0,0) and moving to state (0,0) when action RIGHT is performed has probability 0.01 of occurring - this is the first perpendicular state.

P((0,1)(0,0),RIGHT)=0.8P((0,1)|(0,0), \text{RIGHT})=0.8

The probability of starting in state (0,0) and moving to state (1,0) when action RIGHT is performed has probability of 0.8 occurring - this is the chosen direction

P((0,0)(0,0),RIGHT)=0.1P((0,0)|(0,0), \text{RIGHT})=0.1

The probability of starting in state (0,0) and moving to state (2,0) when action RIGHT is performed has probability of 0.8 occurring - this is the second perpendicular state.

Exercise 7.4d

Write a function to calculate the row of PRIGHTP^\text{RIGHT} corresponding to an initial state at the bottom-left corner, (0,0) of the gridworld for any action or deterministic π((0,0))\pi((0,0)).