からっぽのしょこ

読んだら書く!書いたら読む!同じ事は二度調べ(たく)ない

5.3:モンテカルロ法による方策評価の実装【ゼロつく4のノート】

はじめに

 『ゼロから作るDeep Learning 4 ――強化学習編』の独学時のまとめノートです。初学者の補助となるようにゼロつくシリーズの4巻の内容に解説を加えていきます。本と一緒に読んでください。

 この記事は、5.3節の内容です。モンテカルロ法による方策評価(状態価値関数の推定)を実装します。

【前節の内容】

www.anarchive-beta.com

【他の記事一覧】

www.anarchive-beta.com

【この記事の内容】

5.3 モンテカルロ法の実装

 モンテカルロ法に対応したエージェントを実装して、3×4マスのグリッドワールド(図4-8)に対してモンテカルロ法により状態価値関数を求めます。

 利用するライブラリを読み込みます。

# ライブラリを読み込み
import numpy as np
from collections import defaultdict

# 追加ライブラリ
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
from matplotlib.animation import FuncAnimation

 推移をアニメーションで確認するのにanimationモジュールを利用します。不要であれば省略してください。

 また、3×4マスのグリッドワールドのクラスGridWorldを読み込みます。

# 実装済みのクラスと関数を読み込み
import sys
sys.path.append('../deep-learning-from-scratch-4-master')
from common.gridworld import GridWorld

 実装済みクラスの読み込みについては「3.6.1:MNISTデータセットの読み込み【ゼロつく1のノート(Python)】 - からっぽのしょこ」、GridWorldクラスについては「4.2.1:GridWorldクラスの実装:評価と改善に関するメソッド【ゼロつく4のノート】 - からっぽのしょこ」「4.2.1:GridWorldクラスの実装:可視化に関するメソッド【ゼロつく4のノート】 - からっぽのしょこ」を参照してください。

5.3.2 エージェントクラスの実装

 モンテカルロ法により状態価値関数の計算(方策の評価)を行うエージェントを実装します。

・処理の確認

 RandomAgentクラスの内部で行う処理を確認します。

・サンプリング

 状態・行動・報酬のサンプリングに関する処理を確認します。initメソッド・get_actionメソッド・addメソッド・resetメソッドに対応します。

 全ての方策を格納するディクショナリを作成します。

# 確率論的方策用の確率分布を指定
random_actions = {0: 0.25, 1: 0.25, 2: 0.25, 3: 0.25}

# 確率論的方策を初期化
pi = defaultdict(lambda: random_actions)
print(pi.items())
dict_items([])

 各状態(マス)の確率論的方策の初期値(全ての行動で等しい確率)をディクショナリrandom_actionsとして指定します。
 全ての状態の確率論的方策を格納するディクショナリpidefaultdict()を使って作成します。

 状態(マス)を指定して、確率論的方策(確率分布)を取り出します。

# 状態を指定(キーを作成)
state = (1, 1)

# 指定した状態の方策を取得
action_probs = pi[state]
print(action_probs)
print(pi.items())
{0: 0.25, 1: 0.25, 2: 0.25, 3: 0.25}
dict_items([((1, 1), {0: 0.25, 1: 0.25, 2: 0.25, 3: 0.25})])

 設定されていないキーを指定するとデフォルトの値(random_actions)が出力されます。また、指定したキーに対応する値としてディクショナリに格納されます。

 確率論的方策(確率分布)から行動番号と行動確率を取り出します。

# 行動番号を取得
actions = list(action_probs.keys())

# 行動確率を取得
probs = list(action_probs.values())
print(actions)
print(probs)
[0, 1, 2, 3]
[0.25, 0.25, 0.25, 0.25]

 行動番号(キー)をkeys()、行動確率(値)をvalues()で出力します。

 確率論的方策に従ってランダムに行動を決定します。

# 確率論的方策によりランダムに行動を決定
action = np.random.choice(a=actions, p=probs)
print(action)
0

 全ての行動actionsから確率probsに従い行動actionnp.random.choice()で生成します。

 ここまでで、ある状態stateにおいて方策pi[state]に従い行動を出力するエージェントの処理を確認しました。現在の状態と行動から環境(GridWorldクラス)の処理によって報酬が得られます。
 モンテカルロ法では、状態・行動・報酬をサンプルデータとして扱います。1エピソードで得られたサンプルを用いて、状態価値関数や行動価値関数を推定します。逐次計算を行うことで、サンプルの情報は状態価値として形を変えて保存され、過去のデータは不要になります。

 サンプリングされたデータ(状態・行動・報酬)をステップごとに記録します。ここでは、処理の内容を簡単に再現して確認します。

# サンプルデータの記録用のリストを初期化
memory = []

# 試行回数(時刻の最大値)を指定
T = 3

# (ダミーの)サンプルデータを格納
for i in range(T):
    # ダミーデータを作成
    data = ((i, i), i, i)
    
    # サンプルを格納
    memory.append(data)
print(memory)
[((0, 0), 0, 0), ((1, 1), 1, 1), ((2, 2), 2, 2)]

 状態(マスのインデックス)・行動番号・報酬をタプルにまとめて、リストに格納していきます。
 エピソードが終了すると、格納されたサンプルデータを用いて状態価値関数を計算します。

 状態価値関数を更新して次のエピソードに移る際に、メソッドを使ってリストの要素を削除します。

# 記録用のリストを初期化
memory.clear()
print(memory)
[]

 リスト型オブジェクトのメソッドclear()でリストを初期化できます。

・状態価値関数の計算メソッド

 続いて、状態価値関数の推定(方策評価)に関する処理を確認します。evalメソッドに対応します。

 状態価値関数の推定の処理を確認するには、サンプルを得るためにエージェントと環境のやり取りを行う必要があります。分かりやすいように、状態価値の計算の処理を簡単に再現します。

# 全ての状態を設定
states = [(0, 0), (1, 1), (2, 2)]

# 割引率を指定
gamma = 0.9

# 状態価値関数を初期化
V = {state: 0.0 for state in states}
print(list(V.keys()))

# 遷移回数を初期化
cnts = {state: 0 for state in states}

# 収益を初期化
G = 0

# 試行回数(時刻の最大値)を指定
T = 5

# サンプルごとに状態価値を更新
for t in range(T):
    # ダミーの状態を出力
    idx = np.random.choice(np.arange(len(states)))
    state = states[idx]
    
    # ダミーの報酬を出力
    reward = 1.0
    
    # 時刻tの収益を計算:式(3.3)
    G = gamma * G + reward
    
    # 遷移回数をカウント
    cnts[state] += 1
    
    # 状態価値関数を計算:式(5.2')
    V[state] += (G - V[state]) / cnts[state]
    
    # 途中経過を表示
    print('--- t=' + str(t+1) + ', state:' + str(state) + ' ---')
    print(np.round(list(V.values()), 2))
    print(list(cnts.values()))
[(0, 0), (1, 1), (2, 2)]
--- t=1, state:(2, 2) ---
[0. 0. 1.]
[0, 0, 1]
--- t=2, state:(0, 0) ---
[1.9 0.  1. ]
[1, 0, 1]
--- t=3, state:(0, 0) ---
[2.3 0.  1. ]
[2, 0, 1]
--- t=4, state:(0, 0) ---
[2.68 0.   1.  ]
[3, 0, 1]
--- t=5, state:(2, 2) ---
[2.68 0.   2.55]
[3, 0, 2]

 (行動を伴わずにランダムに3種類の状態を遷移して、必ず報酬1が得られるものとして、状態価値を計算しています。)

 前ステップの収益G・割引率gamma・報酬rewardを用いて式(3.3)の計算を行い、収益Gを計算(更新)します。
 各状態への遷移回数をディクショナリcntsに記録します。サンプルごとに、現在の状態stateをキーとして値に1を加えることで、その状態の遷移回数をカウントします。状態が変わらない場合もカウントします。
 各状態の状態価値をディクショナリVに格納します。収益G・状態価値V[state]・遷移回数cnts[state]を用いて式(5.2)の計算を行い、状態価値を計算(更新)します。

 実際の推定では、1エピソード分のサンプルをmemoryに格納しておき、サンプルデータをリストの後から取り出して、同様に処理します。後から計算する理由については「5.2:モンテカルロ法による方策評価【ゼロつく4のノート】 - からっぽのしょこ」を参照してください。

 以上が、モンテカルロ法による方策評価を行うエージェントの処理です。

・実装

 処理の確認ができたので、モンテカルロ法によるエージェントの機能をクラスとして実装します。

# モンテカルロ法によるエージェントの実装
class RandomAgent:
    # 初期化メソッドの定義
    def __init__(self):
        # 値の設定
        self.gamma = 0.9 # 割引率
        self.action_size = 4 # 行動の種類数
        
        # オブジェクトの初期化
        random_actions = {0: 0.25, 1: 0.25, 2: 0.25, 3: 0.25} # 確率論的方策用の確率分布
        self.pi = defaultdict(lambda: random_actions) # 確率論的方策
        self.V = defaultdict(lambda: 0) # 状態価値関数
        self.cnts = defaultdict(lambda: 0) # 遷移回数
        self.memory = [] # サンプルデータ
    
    # 行動メソッドの定義
    def get_action(self, state):
        # 現在の状態における方策の確率分布を取得
        action_probs = self.pi[state]
        actions = list(action_probs.keys()) # 行動番号
        probs = list(action_probs.values()) # 行動確率
        
        # 確率論的方策による行動を出力
        return np.random.choice(actions, p=probs)
    
    # 記録メソッドの定義
    def add(self, state, action, reward):
        # 現在の時刻におけるサンプルデータをタプルに格納
        data = (state, action, reward)
        
        # サンプルを格納
        self.memory.append(data)
    
    # 記録の初期化メソッドの定義
    def reset(self):
        # 記録用のリストを初期化
        self.memory.clear()
    
    # 状態価値関数の計算メソッドの定義
    def eval(self):
        # 状態価値関数を計算
        G = 0
        for data in reversed(self.memory):
            # 各時刻におけるサンプルデータを取得
            state, action, reward = data
            
            # 収益を計算:式(3.3)
            G = self.gamma * G + reward
            
            # 遷移回数をカウント
            self.cnts[state] += 1
            
            # 状態価値関数を計算:式(5.2')
            self.V[state] += (G - self.V[state]) / self.cnts[state]


 実装したクラスを試してみましょう。

 環境(グリッドワールド)とエージェントのインスタンスを作成します。

# 環境とエージェントのインスタンスを作成
env = GridWorld()
agent = RandomAgent()

# 最初の状態を取得
state = env.start_state
print(state)

# 1エピソードのシミュレーション
while True:
    # 確率論的方策に従い行動を決定
    action = agent.get_action(state)

    # サンプルデータを取得
    next_state, reward, done = env.step(action)

    # サンプルデータを格納
    agent.add(state, action, reward)

    # ゴールに着いた場合
    if done:
        # ループを終了
        break

    # 状態を更新
    state = next_state
print(next_state)
(2, 0)
(0, 3)

 agentget_action()で方策に従い行動して、envstep()で状態を遷移し報酬を出力します。現在の状態・行動・報酬をagentadd()で記録します。
 ゴールマスに着くとdoneTrueになるので、breakでループ処理を終了します。

 サンプルデータがmemoryに格納されているのを確認します。

# サンプルデータを確認
print(agent.memory[:5])
print(len(agent.memory))
[((2, 0), 3, 0), ((2, 1), 2, 0), ((2, 0), 3, 0), ((2, 1), 1, 0), ((2, 1), 1, 0)]
10


 eval()メソッドで状態価値関数を計算します。

# 状態価値関数を確認
print(agent.V.values())

# 状態価値関数を計算
agent.eval()

# 状態価値関数を確認
print(list(agent.V.keys()))
print(np.round(list(agent.V.values()), 4))
dict_values([])
[(0, 2), (1, 2), (2, 2), (2, 1), (2, 0)]
[1.     0.855  0.729  0.5521 0.4329]

 サンプルとして得られなかった状態はagent.Vに含まれません。

 reset()でサンプルデータを削除します。

# サンプルデータを削除
agent.reset()
print(agent.memory)
[]


 この項では、エージェントを実装しました。次項では、状態価値関数を推定します。

5.3.3 モンテカルロ法を動かす

 3×4マスのグリッドワールド(図4-8)に対してモンテカルロ法により状態価値関数を求めます。

・推定

 マルコフ法により状態価値関数を推定(方策を評価)します。

# インスタンスを作成
env = GridWorld()
agent = RandomAgent()

# エピソード数を指定
episodes = 1000

# 推移の可視化用のリストを初期化
trace_V = [{state: agent.V[state] for state in env.states()}] # 初期値を記録

# 繰り返しシミュレーション
for episode in range(episodes):
    # 状態を初期化(エージェントの位置をスタートマスに設定)
    state = env.reset()
    
    # サンプルデータを初期化
    agent.reset()
    
    # 試行回数(時刻)を初期化
    t = 0
    
    # 1エピソードのシミュレーション
    while True:
        # 試行回数をカウント
        t += 1
        
        # 確率論的方策に従い行動を決定
        action = agent.get_action(state)
        
        # サンプルデータを取得
        next_state, reward, done = env.step(action)
        
        # サンプルデータを格納
        agent.add(state, action, reward)
        
        # ゴールに着いた場合
        if done:
            # 状態価値関数を計算
            agent.eval()
            
            # 更新値を記録
            trace_V.append(agent.V.copy())
            
            # 試行回数を表示
            print('episode '+str(episode+1) + ': T='+str(t))
            
            # ループを中断して次のエピソードへ
            break
        
        # 状態を更新
        state = next_state
episode 1: T=14
episode 2: T=56
episode 3: T=15
episode 4: T=43
episode 5: T=32
(省略)
episode 996: T=19
episode 997: T=32
episode 998: T=99
episode 999: T=42
episode 1000: T=35

 スタートマスからランダムに行動し、ゴールマスに着くまでを1エピソードとします。エピソードごとに、GridWorldクラスのreset()メソッドで状態を初期化し(エージェントをスタートマスに戻し)、RandomAgentクラスのreset()メソッドでサンプルデータを初期化(過去のデータを削除)します。
 episodesに指定した回数のシミュレーションを行い、繰り返し状態価値関数を更新します。

 推移の確認用に、状態価値関数の更新値をtrace_Vに格納していきます。

 推定した状態価値関数をヒートマップで確認します。

# 状態価値関数のヒートマップを作図
env.render_v(v=agent.V, policy=agent.pi)

モンテカルロ法により推定した状態価値関数のヒートマップ

 結果の解釈については本を参照してください。

・更新推移の可視化

 ここまでで、繰り返しの更新処理を確認しました。続いて、途中経過をアニメーションで確認します。

 状態価値関数のヒートマップのアニメーションを作成します。

・作図コード(クリックで展開)

# グリッドマップのサイズを取得
xs = env.width
ys = env.height

# 状態価値の最大値・最小値を取得
vmax = max([max(trace_V[i].values()) for i in range(len(trace_V))])
vmin = min([min(trace_V[i].values()) for i in range(len(trace_V))])

# 色付け用に最大値・最小値を再設定
vmax = max(vmax, abs(vmin))
vmin = -1 * vmax
vmax = 1 if vmax < 1 else vmax
vmin = -1 if vmin > -1 else vmin

# カラーマップを設定
color_list = ['red', 'white', 'green']
cmap = LinearSegmentedColormap.from_list('colormap_name', color_list)

# 図を初期化
fig = plt.figure(figsize=(12, 9), facecolor='white') # 図の設定
plt.suptitle('Monte Carlo Method', fontsize=20) # 全体のタイトル

# 作図処理を関数として定義
def update(i):
    # 前フレームのグラフを初期化
    plt.cla()
    
    # i回目の更新値を取得
    pi = agent.pi
    V = trace_V[i]
    
    # ディクショナリを配列に変換
    v = np.zeros((env.shape))
    for state, value in V.items():
        v[state] = value
        
    # 状態価値のヒートマップを描画
    plt.pcolormesh(np.flipud(v), cmap=cmap, vmin=vmin, vmax=vmax) # ヒートマップ
    
    # マス(状態)ごとに処理
    for state in env.states():
        # インデックスを取得
        y, x = state
        
        # 報酬を抽出
        r = env.reward_map[state]
        
        # 報酬がある場合
        if r != 0 and r is not None:
            # 報酬ラベル用の文字列を作成
            txt = 'R ' + str(r)
            
            # ゴールの場合
            if state == env.goal_state:
                # 報酬ラベルにゴールを追加
                txt = txt + ' (GOAL)'
            
            # 報酬ラベルを描画
            plt.text(x=x+0.1, y=ys-y-0.9, s=txt, 
                     ha='left', va='bottom', fontsize=15)
            
        # 壁以外の場合
        if state != env.wall_state:
            # 状態価値ラベルを描画
            plt.text(x=x+0.9, y=ys-y-0.1, s=str(np.round(v[y, x], 2)), 
                     ha='right', va='top', fontsize=15)
            
            # 確率論的方策を抽出
            actions = pi[state]
            
            # 確率が最大の行動を抽出
            max_actions = [k for k, v in actions.items() if v == max(actions.values())]
            
            # 矢印の描画用のリストを作成
            arrows = ['↑', '↓', '←', '→']
            offsets = [(0, 0.1), (0, -0.1), (-0.1, 0), (0.1, 0)]
            
            # 行動ごとに処理
            for action in max_actions:
                # 矢印の描画用の値を抽出
                arrow = arrows[action]
                offset = offsets[action]
                
                # ゴールの場合
                if state == env.goal_state:
                    # 描画せず次の状態へ
                    continue
                
                # 方策ラベル(矢印)を描画
                plt.text(x=x+0.5+offset[0], y=ys-y-0.5+offset[1], s=arrow, 
                         ha='center', va='center', size=20)
        
        # 壁の場合
        if state == env.wall_state:
            # 壁を描画
            rect = plt.Rectangle(xy=(x, ys-y-1), width=1, height=1, 
                                 fc=(0.4, 0.4, 0.4, 1.0)) # 長方形を作成
            plt.gca().add_patch(rect) # 重ねて描画
    
    # グラフの設定
    plt.xticks(ticks=np.arange(xs)) # x軸の目盛位置
    plt.yticks(ticks=np.arange(ys), labels=ys-np.arange(ys)-1) # y軸の目盛位置
    plt.xlim(xmin=0, xmax=xs) # x軸の範囲
    plt.ylim(ymin=0, ymax=ys) # y軸の範囲
    plt.tick_params(labelbottom=False, labelleft=False, labelright=False, labeltop=False) # 軸ラベル
    plt.grid() # グリッド線
    plt.title('iter:'+str(i), loc='left') # タイトル

# gif画像を作成
anime = FuncAnimation(fig=fig, func=update, frames=len(trace_V), interval=100)

# gif画像を保存
anime.save('ch5_3_3.gif')


状態価値関数のヒートマップの推移

 各エピソードで更新した状態価値をtrace_Vから取り出してヒートマップを描画する処理を関数update()として定義して、FuncAnimation()でアニメーション(gif画像)を作成します。

 状態価値関数の更新値の推移を折れ線グラフで確認します。

・作図コード(クリックで展開)

# 状態数を取得
state_size = env.reward_map.size

# 作図用の配列を作成
trace_V_arr = np.empty((episodes+1, state_size))
for i in range(episodes+1):
    # i回目の更新値を抽出
    V = trace_V[i]
    
    # 配列に格納
    trace_V_arr[i] = [V[state] for state in env.states()]

# 全ての状態を取得
states = list(agent.V.keys())

# 状態価値関数の推移を作図
plt.figure(figsize=(12, 10), facecolor='white')
for s in range(state_size):
    h, w = states[s] # マスのインデックス
    plt.plot(np.arange(episodes+1), trace_V_arr[:, s], label='$V_n(L_{'+str(h)+','+str(w)+'})$') # 各状態の価値の推移
plt.xlabel('episode')
plt.ylabel('state-value')
plt.suptitle('Monte Carlo Method', fontsize=20)
plt.title('$\gamma='+str(agent.gamma)+'$', loc='left')
plt.grid()
plt.legend()
plt.show()


状態価値関数の推移

 行番号を$h$、列番号を$w$として各マスを$L_{h,w}$で表します。また、$i$回目の状態(マス)$L_{h,w}$の価値を$V_i(L_{h,w})$で表します。マスのインデックスについては図4-9を参照してください。
 各曲線の縦軸の値が、ヒートマップの色に対応します。

 この節では、モンテカルロ法により方策評価を行うエージェントを実装して、状態価値関数を求めました。次節では、行動制御を行うエージェントを実装します。

参考文献

おわりに

 間があいたので?方策評価って何だっけとか思ってしまいましたが、ある方策のときの状態価値を求めることを方策を評価すると言うのでした。
 ベイズの話でモンテカルロ法に触れていたので、4章よりも5章の方が馴染みやすかったです。

【次節の内容】

www.anarchive-beta.com