Value Iteration With Scala 2
Scala based implementation of the value iteration algorithm
In a previous post we saw how to implement the value iteration algorithm in Scala. This is particularly easy algorithm to implement. In that post, we used a dummy environment to check on the algorithm. In this post we extend our implementation by integrating ScalaPy in our implementation. This way we can interact with OpenAI Gym library which is written in Python. However, this way we have access to various environments to test our reinforcement learning algorithms.
Recall that one drawback of policy iteration is that each of its iterations involves a policy evaluation. This, however, may itself be an iterative computation; therefore requiring multiple sweeps through the state set [1]. Furthermore, if the evaluation is done iteratively, then convergence to $V_{\pi}$ occurs only in the limit [1].
Luckily, the policy evaluation step of policy iteration can truncated without loosing the convergence gurantees of the method. Moreover, this can be done in several different ways [1].
In particular, when policy evaluation is stopped after just one update of each state, the algorithm is called value iteration. It can be written as a particularly simple update operation that combines the policy improvement and truncated policy evaluation steps [1]
$$V_{k+1}(s) = max_{\alpha}\sum_{s^*, r}p(s^*, r | s, \alpha)\left[r + \gamma V_{k}(s^*)\right], ~~ \forall s \in \mathbb{S}$$
Figure 1: Value iteration algorithm. Image from [1].
The value iteration is obtained simply by turning the Bellman optimality equation into an update rule [1]. It requires the maximum to be taken over all the actions. Furthermore, the algorithm terminates by checking the amount of change of the value function.
OpenAI Gym provides a large collection of environments to test on reinforcement learning algorithms. The library is written in Python. Hence, we cannot use here as is. Fortunately, ScalaPy allows us to use Python libraries from Scala code. Check the ScalaPy how to install it on your machine.
Let's see how to implement a simple wrapper over the FrozenLake-v0 environment.
import scala.collection.mutable.ArrayBuffer
import scala.util.control.Breaks._
import scala.math.max
package engine.worlds
import me.shadaj.scalapy.py
class FrozenLake(val version: String) {
// load the gym module
private val gym = py.module("gym")
private var env: me.shadaj.scalapy.py.Dynamic = null
// Make the environment
def make: Unit = this.env = gym.make(this.name)
// Reset the environment
def reset: Int = {
val state = this.env.reset()
state.as[Int]
}
// Returns the name of the environment
def name: String = {"FrozenLake-" + version}
// Returns the number of states
def nStates: Int = this.env.observation_space.n.as[Int]
// Returns the number of actions
def nActions: Int = this.env.action_space.n.as[Int];
//
def getDynamics(state: Int, action: Int): Seq[(Double, Int, Double, Boolean)] = {
val P = py.Dynamic.global.dict(this.env.P)
val dynamicsTuple = P.bracketAccess(state)
val result = dynamicsTuple.bracketAccess(action).as[Seq[Tuple4[Double, Int,Double, Boolean]]]
result
}
}
This is very simple wrapper suitable though for our needs. It hides all the boilerplate code we need to have in order to interface with OpenAI Gym. Let's also change the implementation of the ValueIteration class.
The class has two implementation depending on the trainMode value defined below
package engine.rl
object TrainMode extends Enumeration {
val DEFAULT = Value(0, name = "DEFAULT")
val STOCHASTIC = Value(1, name = "STOCHASTIC")
}
The STOCHASTIC mode walks in the environment by randomly selecting an action. The DEFAULT implements the algorithm as we implemented in the previous related post.
package engine.rl
import scala.collection.mutable
import breeze.linalg.{DenseVector, max}
import engine.worlds.DiscreteEnvironment
import scala.util.control.Breaks.{break, breakable}
class ValueIteration(env: DiscreteEnvironment,
gamma: Double, maxIterations: Int, tolerance: Double,
trainMode: TrainMode.Value=TrainMode.DEFAULT) {
val rewards = new mutable.HashMap[Tuple3[Int, Int, Int], Double]()
val transits = new mutable.HashMap[Tuple3[Int, Int, Int], Int]
var stateValues = DenseVector.zeros[Double](env.nStates)
var residual = 1.0
def train: Unit = {
this.rewards.clear()
this.transits.clear()
this.stateValues = DenseVector.zeros[Double](env.nStates)
breakable {
for(itr <- Range(0, maxIterations)){
println("> Learning iteration " + itr)
println("> Learning residual " + residual)
step
if(residual < tolerance) break;
}
}
}
def step: Unit ={
// stop condition
var delta = 0.0
// update each state
for( s <- 0 until env.nStates){
// Do a one-step lookahead to find the best action
val value = oneStepLookahead(state=s)
val bestActionValue = breeze.linalg.max(value)
delta = math.max(delta, math.abs(bestActionValue - stateValues(s)))
stateValues(s) = bestActionValue
}
residual = delta
}
def defaultStep: Unit = {
}
def stochasticStep: Unit = {
}
protected def oneStepLookahead(state: Int): DenseVector[Double] = {
val values = DenseVector.zeros[Double](env.nActions)
for(action <- 0 until env.nActions) {
val dynamics = env.getDynamics(state = state, action = action)
for(item <- dynamics.indices){
val prob = dynamics(item)._1
val next_state = dynamics(item)._2
val reward = dynamics(item)._3
values(action) += prob * (reward + gamma * stateValues(next_state))
if(rewards.contains((state, action, next_state))){
rewards.update((state, action, next_state), reward)
transits.update((state, action, next_state), 1)
}
else{
rewards.addOne((state, action, next_state), reward)
transits.addOne((state, action, next_state), 1)
}
}
}
values
}
}
- Richard S. Sutton and Andrew G. Barto,
Reinforcement Learning: An Introduction.