からっぽのしょこ

読んだら書く!書いたら読む!同じ事は二度調べ(たく)ない

7.4:Q学習とニューラルネットワーク【ゼロつく4のノート】

はじめに

 『ゼロから作るDeep Learning 4 ――強化学習編』の独学時のまとめノートです。初学者の補助となるようにゼロつくシリーズの4巻の内容に解説を加えていきます。本と一緒に読んでください。

 この記事は、7.4節の内容です。ニューラルネットワークを用いてQ学習を実装します。

【前節の内容】

www.anarchive-beta.com

【他の記事一覧】

www.anarchive-beta.com

【この記事の内容】

7.4 Q学習とニューラルネットワーク

 6.4節では、Q学習により行動価値関数を推定して方策を改善(方策を制御)しました。この節では、ニューラルネットワークを用いて行動価値関数の計算を行い、勾配降下法によりQ学習を行います。

 利用するライブラリを読み込みます。

# ライブラリを読み込み
import numpy as np
from dezero import Model
from dezero import optimizers
import dezero.layers as L
import dezero.functions as F

# 追加ライブラリ
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
from matplotlib.animation import FuncAnimation

 更新推移をアニメーションで確認するのにmatplotlibのモジュールを利用します。不要であれば省略してください。

 また、3×4マスのグリッドワールドのクラスGridWorldを読み込みます。

# 実装済みのクラスと関数を読み込み
import sys
sys.path.append('../deep-learning-from-scratch-4-master')
from common.gridworld import GridWorld

 実装済みクラスの読み込みについては「3.6.1:MNISTデータセットの読み込み【ゼロつく1のノート(Python)】 - からっぽのしょこ」、GridWorldクラスについては「4.2.1:GridWorldクラスの実装:評価と改善に関するメソッド【ゼロつく4のノート】 - からっぽのしょこ」「4.2.1:GridWorldクラスの実装:可視化に関するメソッド【ゼロつく4のノート】 - からっぽのしょこ」を参照してください。

7.4.1 ニューラルネットワークの前処理

 まずは、縦と横のインデックスで表される状態(マスのインデックス)をone-hotベクトルに変換する関数を実装します。

処理の確認

 one_hot関数の内部で行う処理を確認します。

 グリッドワールドのマス(状態)の情報を利用するために、グリッドワールドのインスタンスを作成しておきます。

# 環境のインスタンスを作成
env = GridWorld()


 3×4のグリッドワールドのマスのインデックス$(y, x)$と、one-hotベクトルのインデックス$i$の対応関係を図で確認します。グリッドワールドの座標については図4-9、作図については4.2.1項を参照してください。

・作図コード(クリックで展開)

# 縦・横のサイズを設定
HEIGHT = 3
WIDTH = 4

# グリッドワールドの図を作成
plt.figure(figsize=(8, 6), facecolor='white') # 図の設定

# 状態ごとに処理
for state in env.states():
    # マスのインデックスを取得
    y, x = state
    
    # ベクトルのインデックスに変換
    idx = WIDTH * y + x

    # 状態を描画
    plt.text(x=x+0.5, y=HEIGHT-y-0.5, s=str(state), 
             ha='center', va='bottom', fontsize=20)
    
    # インデックスを描画
    plt.text(x=x+0.5, y=HEIGHT-y-0.5, s=str(idx), 
             ha='center', va='top', fontsize=20)

plt.xticks(ticks=np.arange(WIDTH)) # x軸の目盛位置
plt.yticks(ticks=np.arange(HEIGHT), labels=HEIGHT-np.arange(HEIGHT)-1) # y軸の目盛位置
plt.xlim(xmin=0, xmax=WIDTH) # x軸の範囲
plt.ylim(ymin=0, ymax=HEIGHT) # y軸の範囲
plt.xlabel('x (width)') # x軸ラベル
plt.ylabel('y (height)') # y軸ラベル
plt.suptitle('Grid World', fontsize=20) # 全体のタイトル
plt.title('state:(y, x), index:i', fontsize=20) # タイトル
plt.grid() # グリッド線
plt.gca().set_aspect('equal') # アスペクト比
plt.show()
# one-hotベクトルの図を作成
plt.figure(figsize=(18, 2.5), facecolor='white') # 図の設定

# 状態ごとに処理
for state in env.states():
    # マスのインデックスを取得
    y, x = state
    
    # ベクトルのインデックスに変換
    idx = WIDTH * y + x

    # 状態を描画
    plt.text(x=idx+0.5, y=0.5, s=str(state), 
             ha='center', va='bottom', fontsize=20)
    
    # インデックスを描画
    plt.text(x=idx+0.5, y=0.5, s=str(idx), 
             ha='center', va='top', fontsize=20)

plt.xticks(ticks=np.arange(HEIGHT*WIDTH)) # x軸の目盛位置
plt.yticks(ticks=np.arange(1)) # y軸の目盛位置
plt.xlim(xmin=0, xmax=HEIGHT*WIDTH) # x軸の範囲
plt.ylim(ymin=0, ymax=1) # y軸の範囲
plt.xlabel('i (input size)') # x軸ラベル
plt.ylabel('n (batch size)') # y軸ラベル
plt.suptitle('one-hot vector', fontsize=20) # 全体のタイトル
plt.title('state:(y, x), index:i', fontsize=20) # タイトル
plt.grid() # グリッド線
plt.gca().set_aspect('equal') # アスペクト比
plt.show()


マスのインデックスとone-hotベクトルのインデックスの対応関係

 上の行から順番に右側に繋げて1行のベクトルにします。
 グリッドワールドの横方向のサイズ(列数)を$W$とすると、(0から数えて)$y$行目の$x$列目は、$W y + x$で計算できます。

 one-hotベクトル化することにより、1つのデータ(状態)を1行で表します。バッチデータ(複数の状態)を扱う場合は、それぞれone-hotベクトル化して行方向に並べた2次元配列にします(8.2.1項)。

 では、one-hotベクトル化の処理を確認します。

 マスのインデックスをベクトルのインデックスに変換します。

# (仮の)状態を設定
state = (1, 2)

# グリッドワールドのインデックスを取得
y, x = state

# one-hotベクトルのインデックスを計算
idx = WIDTH * y + x
print(idx)
6

 値を1にするインデックスをidxとします。

 one-hotベクトルを作成します。

# 受け皿を作成
vec = np.zeros(HEIGHT*WIDTH)
print(vec)

# one-hotベクトルに変換
vec[idx] = 1.0
print(vec)

# 2次元配列に変換
vec = vec[np.newaxis, :]
print(vec)
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0.]
[[0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0.]]

 全ての要素を0にした1次元配列を作成して、idx番目の要素を1に置き換えます。
 (8章以降で)バッチデータを扱う場合を想定して、2次元配列に変換しておきます。

 ちなみに、1つの要素が1でそれ以外の要素が0なので、np.argmax()で値が1のインデックスを得られます。

# インデックスを取得
idx = np.argmax(vec, axis=1)
print(idx)
[6]


 以上が、one-hotベクトル化の処理です。

実装

 処理の確認ができたので、状態をone-hotベクトルに変換する処理を関数として実装します。

# one-hotベクトル化を実装
def one_hot(state):
    # グリッドワールドのサイズを設定
    HEIGHT, WIDTH = 3, 4
    
    # 受け皿を作成
    vec = np.zeros(HEIGHT*WIDTH, dtype=np.float32)
    
    # グリッドワールドのインデックスを取得
    y, x = state
    
    # one-hotベクトルのインデックスを計算
    idx = WIDTH * y + x
    
    # one-hotベクトルに変換
    vec[idx] = 1.0
    
    # 2次元配列に変換して出力
    return vec[np.newaxis, :]


 実装した関数を試してみましょう。

 各状態をone-hotベクトルに変換します。

# 環境のインスタンスを作成
env = GridWorld()

# 状態ごとに処理
for state in env.states():
    # one-hotベクトルに変換
    x = one_hot(state)
    print(state, x)
(0, 0) [[1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]]
(0, 1) [[0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]]
(0, 2) [[0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]]
(0, 3) [[0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]]
(1, 0) [[0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]]
(1, 1) [[0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0.]]
(1, 2) [[0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0.]]
(1, 3) [[0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]]
(2, 0) [[0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]]
(2, 1) [[0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0.]]
(2, 2) [[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]]
(2, 3) [[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]]

 マスのインデックス(カテゴリデータ)をone-hotベクトルで表現できているのを確認できます。

 ちなみに、one-hotベクトルで内積を計算することは、対応する要素を取り出す操作を意味します。

# (仮の)状態を設定
state = (1, 2)

# one-hotベクトルに変換
x = one_hot(state)
print(x)

# インデックスを確認
idx = np.argmax(x, axis=1)
print(idx)

# (仮の)重みを作成
W = np.arange(HEIGHT*WIDTH, dtype=np.float32).reshape(-1, 1)
print(W)

# 状態と重みの内積を計算
w = np.dot(x, W)
print(w)
[[0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0.]]
[6]
[[ 0.]
 [ 1.]
 [ 2.]
 [ 3.]
 [ 4.]
 [ 5.]
 [ 6.]
 [ 7.]
 [ 8.]
 [ 9.]
 [10.]
 [11.]]
[[6.]]

 分かりやすいように0からの整数を並べた重みWを作成しておき、one-hotベクトルの状態xと内積を計算します。
 重みWidx番目の要素を取り出しているのが分かります。つまり、one-hotベクトルの状態と重みの内積の計算によって、状態stateに対応する重みの値を抽出しています。

 以上で、one-hotベクトル化を実装できました。

7.4.2 Q関数を表すニューラルネットワーク

 次は、行動価値関数(Q関数)の近似計算を行う2層のニューラルネットワークを実装します。2層のニューラルネットワークを実装については「7.3.1-3:ニューラルネットワーク【ゼロつく4のノート】 - からっぽのしょこ」を参照してください。

実装

 2層のニューラルネットワークをQ関数のクラスとして実装します。

# ニューラルネットワークを用いたQ関数
class QNet(Model):
    # 初期化メソッドの定義
    def __init__(self):
        # 親クラスのメソッドを継承
        super().__init__()
        
        # レイヤのインスタンスを作成
        self.l1 = L.Linear(100) # 入力層
        self.l2 = L.Linear(4) # 出力層
    
    # 順伝播メソッドの定義
    def forward(self, x):
        # ニューラルネットワークを計算
        x = F.relu(self.l1(x)) # 入力層
        x = self.l2(x) # 出力層
        return x

 中間層(隠れ層)のサイズ(ニューロン数)は、任意の整数を指定できます。出力層のサイズは、上下左右の4つの行動に対応するので4です。
 活性化関数としてReLU関数を使います。

 実装したクラスを試してみましょう。

 行動価値関数(ニューラルネットワーク)のインスタンスを作成します。

# ニューラルネットワークのインスタンスを作成
qnet = QNet()

# パラメータを確認
for param in qnet.params():
    # 値を確認
    print(param)
    
    # 形状を確認
    if param.data is None: # 重みの場合
        print(None)
    else: # バイアスの場合
        print(param.shape)
variable(None)
None
variable([0. 0. 0. 0.])
(4,)
variable(None)
None
variable([0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
          0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
          0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
          0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
          0. 0. 0. 0.])
(100,)

 重みは定義(作成)されておらず、バイアスは0で初期化(作成)されています。

 ニューラルネットワークに状態を入力して、行動価値を出力します。

# (仮の)状態を設定
state = (1, 2)

# one-hotベクトルに変換
x = one_hot(state)
print(x)

# 指定した状態の行動価値を計算
qs = qnet(x)
#qs = qnet.forward(x)
print(qs)
[[0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0.]]
variable([[-0.35878256 -0.4098941   0.14867696 -0.0417503 ]])

 状態(マスのインデックス)stateone-hot()でone-hotベクトルに変換して、qnet()(またはqnet.forward())で2層のニューラルネットワークの順伝播を計算します。
 計算結果が、状態stateにおける上下左右の行動の行動価値です。ただし、まだ学習を行っていないので、ランダムな値が出力されます。

 入力層と出力層のパラメータを確認します。

# パラメータを確認
for param in qnet.params():
    # 値を確認
    if len(param.shape) > 1: # 重みの場合
        print(param[:5, :5])
    else: # バイアスの場合
        print(param[:5])
    
    # 形状を確認
    print(param.shape)
variable([[ 0.10513067  0.0676522   0.11831856  0.02594493]
          [-0.19225346 -0.08629592  0.07928059  0.00693655]
          [-0.03708895 -0.13162747  0.02478896 -0.04269557]
          [-0.01172323  0.10068028  0.01584078  0.04506576]
          [-0.14995448 -0.05980113 -0.05980612 -0.02363988]])
(100, 4)
variable([0. 0. 0. 0.])
(4,)
variable([[-0.24887     0.2420072  -0.06680747 -0.44776544  0.31108776]
          [ 0.2948506   0.5676195  -0.2516151   0.45017698 -0.12430605]
          [ 0.05056989  0.20123398 -0.10073771 -0.40030497  0.04785792]
          [ 0.23835997 -0.13468036 -0.20085484  0.18363592  0.02009848]
          [-0.03784771 -0.28732404  0.4260951   0.23545028  0.6855769 ]])
(12, 100)
variable([0. 0. 0. 0. 0.])
(100,)

 ニューラルネットワークの「入力データ(one-hotベクトル)xの要素数」に対応して、入力層の「重みの行数」が決まります。入力層の「重みの列数」と「バイアスの要素数」、出力層の「重みの行数」は、QNetの内部で指定した「中間層のサイズ」です。出力層の「重みの列数」と「バイアスの要素数」は、「出力層のサイズ」です。
 重み初期値は、標準正規分布に従いランダムに決まります。

 以上で、Q関数として利用する2層のニューラルネットワークを実装できました。

7.4.3 ニューラルネットワークとQ学習

 続いて、ニューラルネットワークを用いた行動価値関数(Q関数)の学習を行うエージェントを実装します。Q学習については、「6.4:Q学習【ゼロつく4のノート】 - からっぽのしょこ」を参照してください。

数式の確認

 ニューラルネットワークを用いたQ学習の更新式を導出します。

Q学習の更新式

 TD法におけるQ学習の更新式は、次の式でした(6.4.2項)。

$$ \begin{align} Q'(S_t, A_t) &= \mathbb{E}_{\pi} \Bigl[ R_t + \gamma \max_{a'} Q(S_{t+1}, a') \Bigm| S_t = s, A_t = a \Bigr] \\ &= Q(S_t, A_t) + \alpha \Bigl\{ R_t + \gamma \max_{a'} Q(S_{t+1}, a') - Q(S_t, A_t) \Bigr\} \tag{6.14} \end{align} $$

 $R_t + \gamma \max_{a'} Q(S_{t+1}, a')$の期待値を、学習率$0 < \alpha < 1$を用いて指数移動平均で近似します。$0 \leq \gamma \leq 1$は収益の計算の割引率です。

 Q学習の更新式(6.14)のターゲットについて

$$ T = R_t + \gamma \max_a Q(S_{t+1}, a) $$

とおきます。

$$ Q'(S_t, A_t) = Q(S_t, A_t) + \alpha \Bigl\{ T - Q(S_t, A_t) \Bigr\} \tag{7.4} $$

 $T = Q(S_t, A_t)$のとき、後の因子が0になるので

$$ Q'(S_t, A_t) = Q(S_t, A_t) $$

となり、更新されません。
 つまり、Q関数$Q(S_t, A_t)$をターゲット$T$に近付けていくことで、Q関数が収束します。

ニューラルネットワークによる近似計算

 行動価値関数(Q関数)を2層のニューラルネットワークで近似することを考えます。

 現在の状態$S_t$をone-hotベクトル$\mathbf{x}$で表します。

$$ \mathbf{x} = \begin{pmatrix} x_0 & x_1 & \cdots & x_{11} \end{pmatrix} ,\ x_i \in \{0, 1\} ,\ \sum_{i=0}^{11} x_i = 1 $$

 Pythonのインデックスに合わせて要素番号(成分番号)を0から数えています。また、この例では入力層のサイズは12です。

 入力層の重みを$\mathbf{W}^{(\mathrm{in})}$、バイアスを$\mathbf{b}^{(\mathrm{in})}$として、全結合層(Affineレイヤ)の計算をします。

$$ \begin{aligned} \mathbf{a} &= \mathbf{x} \mathbf{W}^{(\mathrm{in})} + \mathbf{b}^{(\mathrm{in})} \\ &= \begin{pmatrix} x_0 & x_1 & \cdots & x_{11} \end{pmatrix} \begin{pmatrix} w_{0,0} & w_{0,1} & \cdots & w_{0,H-1} \\ w_{1,0} & w_{1,1} & \cdots & w_{1,H-1} \\ \vdots & \vdots & \ddots & \vdots \\ w_{11,0} & w_{11,1} & \cdots & w_{11,H-1} \end{pmatrix} + \begin{pmatrix} b_0 & b_1 & \cdots & b_{H-1} \end{pmatrix} \\ &= \begin{pmatrix} a_0 & a_1 & \cdots & a_{H-1} \end{pmatrix} \end{aligned} $$

 各要素は、次の計算になります。

$$ a_h = \sum_{i=0}^{11} x_i w_{i,h} + b_h $$

 中間層(隠れ層)のサイズを$H$で表します。
 さらにこの例では、活性化関数をReLU関数とします。

$$ z_h = \mathrm{relu}(a_h) $$

 続いて、出力層の重みを$\mathbf{W}^{(\mathrm{out})}$、バイアスを$\mathbf{b}^{(\mathrm{out})}$として、全結合層(Affineレイヤ)の計算をします。

$$ \begin{aligned} \mathbf{y} &= \mathbf{z} \mathbf{W}^{(\mathrm{out})} + \mathbf{b}^{(\mathrm{out})} \\ &= \begin{pmatrix} z_0 & z_1 & \cdots & z_{H-1} \end{pmatrix} \begin{pmatrix} w_{0,0} & w_{0,1} & w_{0,2} & w_{0,3} \\ w_{1,0} & w_{1,1} & w_{1,2} & w_{1,3} \\ \vdots & \vdots & \vdots & \vdots \\ w_{H-1,0} & w_{H-1,1} & w_{H-1,2} & w_{H-1,3} \end{pmatrix} + \begin{pmatrix} b_0 & b_1 & b_2 & b_3 \end{pmatrix} \\ &= \begin{pmatrix} y_0 & y_1 & y_2 & y_3 \end{pmatrix} \end{aligned} $$

 各要素は、次の計算になります。

$$ y_o = \sum_{h=0}^{H-1} z_h w_{h,o} + b_o $$

 この例では、出力層のサイズは4です。

 ここまでが、ニューラルネットワークの順伝播の計算です。回帰問題では、出力層の活性化関数を恒等関数$y_o = z_o$としますが、計算には影響しないので省略しました。

 ニューラルネットワークの出力データ$\mathbf{y}$の現在の行動$A_t$番目の要素が、状態$S_t$・行動$A_t$の行動価値です。

$$ Q(S_t, A_t) = y_{A_t} $$


パラメータの更新式

 勾配降下法によりニューラルネットワークのパラメータを更新することで、ニューラルネットワークにより行動価値関数を近似します。誤差逆伝播法については「『ゼロから作るDeep Learning』の学習ノート:記事一覧 - からっぽのしょこ」の5章、勾配降下法については6章を参照してください。

 ターゲット$T$とニューラルネットワークの出力データ$y_{A_t}$(行動価値関数の推定値$Q(S_t, A_t)$)の2乗誤差を損失関数(損失レイヤ)とします。

$$ \begin{aligned} L &= (T - y_{A_t})^2 \\ &= (T - Q(S_t, A_t))^2 \end{aligned} $$

 入力データがバッチデータの場合は平均2乗誤差を損失関数とします。

 ここまでが、順伝播における計算です。

 逆伝播では、各パラメータに関する損失関数$L$の微分を求めます。

$$ \begin{aligned} \frac{\partial L}{\partial \mathbf{W}^{(\mathrm{in})}} &= \mathbf{x}^{\top} \frac{\partial L}{\partial \mathbf{a}} \\ \frac{\partial L}{\partial \mathbf{b}^{(\mathrm{in})}} &= \frac{\partial L}{\partial \mathbf{a}} \\ \frac{\partial L}{\partial \mathbf{W}^{(\mathrm{out})}} &= \mathbf{a}^{\top} \frac{\partial L}{\partial \mathbf{y}} \\ \frac{\partial L}{\partial \mathbf{b}^{(\mathrm{out})}} &= \frac{\partial L}{\partial \mathbf{y}} \end{aligned} $$

 DeZeroフレームワークでは、自動微分により計算されるので、具体的な計算式は不要です。

 各パラメータの勾配を用いて、それぞれパラメータを更新します。

$$ \begin{aligned} \mathbf{W}^{(\mathrm{in})} &\leftarrow \mathbf{W}^{(\mathrm{in})} - \eta \frac{\partial L}{\partial \mathbf{W}^{(\mathrm{in})}} \\ \mathbf{b}^{(\mathrm{in})} &\leftarrow \mathbf{b}^{(\mathrm{in})} - \eta \frac{\partial L}{\partial \mathbf{b}^{(\mathrm{in})}} \\ \mathbf{W}^{(\mathrm{out})} &\leftarrow \mathbf{W}^{(\mathrm{out})} - \eta \frac{\partial L}{\partial \mathbf{W}^{(\mathrm{out})}} \\ \mathbf{b}^{(\mathrm{out})} &\leftarrow \mathbf{b}^{(\mathrm{out})} - \eta \frac{\partial L}{\partial \mathbf{b}^{(\mathrm{out})}} \end{aligned} $$

 ここで、$\eta$は学習率です。
 この更新式はSGDによる式です。他の手法を用いる場合は式が異なりますが、こちらもオプティマイザの設定を変更することで行えます(7.3.5項)。

 以上で、ニューラルネットワークを用いたQ学習の更新式が得られました。

処理の確認

 QLearningAgentクラスの内部で行う処理を確認します。

 Q関数として利用する2層のニューラルネットワークと最適化手法のインスタンスを作成します。

# 2層のニューラルネットワーク(Q関数)のインスタンスを作成
qnet = QNet()

# 勾配降下法用の学習率を指定
lr = 0.01

# 最適化手法のインスタンスを作成
optimizer = optimizers.SGD(lr)

# モデルを設定
optimizer.setup(qnet)

 オプティマイザのメソッドsetup()でモデルを設定します。

行動メソッド

 get_actionメソッドの処理を確認します。基本的な処理は「[6.5:サンプルモデル版のQ学習【ゼロつく4のノート】 - からっぽのしょこ」と同様です。

 状態(マスのインデックス)をone-hotベクトルに変換しておきます。これはQLearningAgentクラスの外で行う処理です。

# (仮の)状態を指定
state = (0, 2)

# one-hotベクトルに変換
state = one_hot(state)
print(state)
print(np.argmax(state, axis=1))
[[0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]]
[2]


 現在の状態において行動価値が最大となる行動を計算します。

# 現在の状態の全ての行動価値を計算
qs = qnet(state)
print(np.round(qs.data, 3))

# 行動価値が最大の行動を抽出
action  = qs.data.argmax()
print(action)
[[ 0.187 -0.07  -0.202 -0.176]]
0

 one-hotベクトルの状態stateqnetに入力して順伝播を計算し、現在の状態における全ての行動価値を求めます。
 計算結果qsのインデックスが行動番号に対応するので、np.argmax()で最大値のインデックスを取り出します。
 これがgreedy法における処理です。

 続いて、ε-greedy法により行動を決定します。

# ランダムに行動する確率を指定
epsilon = 0.1
#epsilon = 0.9

# 行動の種類数を設定
action_size = 4

# ε-greedy法により行動を決定:式(6.11)
if np.random.rand() < epsilon:
    # ランダムに行動を出力
    print(np.random.choice(action_size))
else:
    # 現在の状態の全ての行動価値を計算(順伝播を計算)
    qs = qnet(state)
    
    # 行動価値が最大の行動を出力
    print(qs.data.argmax())
0

 0から1の一様乱数をnp.random.rand()を生成して、ランダムに行動する確率epsilonと比較します。epsilonより小さければ($\epsilon$未満の確率で)、np.random.choice()でランダムな行動を出力します。epsilon以上であれば($1-\epsilon$の確率で)、行動価値が最大となる行動を出力します。

更新メソッド

 続いて、updateメソッドの処理を確認します。

 行動メソッドと同様に、次の状態をone-hotベクトルに変換しておきます。

# (仮の)次の状態を指定
next_state = (0, 3)

# one-hotベクトルに変換
next_state = one_hot(next_state)
print(next_state)
print(np.argmax(next_state, axis=1))
[[0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]]
[3]


 次の状態における行動価値の最大値を計算します。

# (仮の)コードフラグを設定
done = True
#done = False

# 次の状態の行動価値の最大値を取得
if done: # ゴールの場合
    # 次の行動価値を0に設定
    next_q = np.zeros(1)
else: # ゴール以外の場合
    # 次の状態の全ての行動価値を計算(順伝播を計算)
    next_qs = qnet(next_state)
    print(np.round(next_qs.data, 3))

    # 行動価値の最大値を取得
    next_q = next_qs.max(axis=1)
        
    # 勾配の計算から除外
    next_q.unchain()
print(np.round(next_q.data, 3))
[0.]

 qnetの順伝播メソッド(を省略した記法)で、次の状態next_stateにおける全ての行動価値を計算します。
 計算結果next_qsの最大値をnp.max()で取り出します。ただし、次の状態がゴールマスの場合は、行動価値を0にします。

 この計算は、ゴールフラグを整数型にして使うことで、if文を使わず(条件分岐せず)に処理できます。

# 因子型から整数型に変換
done = int(done)
print(done)

# 次の状態の全ての行動価値を計算(順伝播を計算)
next_qs = qnet(next_state)
print(np.round(next_qs.data, 3))

# 行動価値の最大値を取得
next_q = next_qs.max(axis=1)
print(np.round((1 - done) * next_q.data, 3))

# 勾配の計算から除外
next_q.unchain()
1
[[-0.068 -0.032 -0.299 -0.441]]
[-0.]

 doneTrueのときにnext_q0になればいいので、doneint()で整数型にして1-donenext_qに掛けます。

 簡単な例で確認します。

# 値を指定
val = 10.0

# フラグを指定
flag = False

# 整数型に変換
flag = int(flag)
print(flag)
print(1 - flag)

# フラグに従い値を変換
val = (1 - flag) * val
print(val)

# フラグを変更
flag = True

# 整数型に変換
flag = int(flag)
print(flag)
print(1 - flag)

# フラグに従い値を変換
val = (1 - flag) * val
print(val)
0
1
10.0
1
0
0.0

 doneFalseのときint(done)0になるので、1-done1になります。よって、1-doneを掛けても値はそのままです。
 逆に、doneTrueのときint(done)1になるので、1-done0になります。よって、1-doneを掛けると値が0になります。

 次の状態の行動価値の最大値next_qを使って、ターゲットを計算します。

# 収益の計算用の割引率を指定
gamma = 0.9

# (仮の)行動を設定
action = 3

# (仮の)報酬を設定
reward = 1

# ターゲット(正解ラベル)を計算
target = reward + (1 - done) * gamma * next_q
print(target.data)
[1.]

 1-doneを使って(条件分岐をせずに)、Q学習の更新式(7.3)のターゲット$R_t + \gamma \max_a Q(S_{t+1}, a)$を計算します。
 計算結果targetを正解ラベルとして使います。

 現在の状態・行動の行動価値を計算します。

# (仮の)状態を指定
state = (0, 2)

# one-hotベクトルに変換
state = one_hot(state)
print(state)
print(np.argmax(state, axis=1))

# 現在の状態の全ての行動価値を計算(順伝播を計算)
qs = qnet(state)
print(np.round(qs.data, 3))

# 現在の状態・行動の行動価値を取得
q = qs[:, action]
print(np.round(q.data, 3))
[[0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]]
[2]
[[ 0.187 -0.07  -0.202 -0.176]]
[-0.176]

 qnetの順伝播メソッドで、現在の状態stateにおける全ての行動価値を計算します。
 計算結果qsからaction番目の要素を取り出します。この値が$Q(S_t, A_t)$です。

 ターゲット(正解ラベル)targetと現在の行動価値qを使って、損失関数(損失レイヤの順伝播)として平均2乗誤差を計算します。

# 損失関数(損失レイヤの順伝播)を計算
loss = F.mean_squared_error(target, q)
print(loss.data)
1.3829426765441895

 targetqの値が近いほど損失が小さくなります。

 ここまでが順伝播の計算です。

 後ほどグラフで確認するために、パラメータの初期値を保存しておきます。

# (確認用に)入力層のパラメータを取得
old_W1 = qnet.l1.W.data.copy()
old_b1 = qnet.l1.b.data.copy()
print(old_W1.shape)
print(old_b1.shape)

# (確認用に)出力層のパラメータを取得
old_W2 = qnet.l2.W.data.copy()
old_b2 = qnet.l2.b.data.copy()
print(old_W2.shape)
print(old_b2.shape)
(12, 100)
(100,)
(100, 4)
(4,)


 順伝播の計算しか行っていない状態だと、勾配を持ちません。

# パラメータの勾配を確認
print(qnet.l1.W.grad)
print(qnet.l1.b.grad)
print(qnet.l2.W.grad)
print(qnet.l2.b.grad)
None
None
None
None


 勾配を計算して、パラメータを更新します。

# 勾配を初期化
qnet.cleargrads()

# 勾配(逆伝播)を計算
loss.backward()

# 勾配降下法によりパラメータを更新
optimizer.update()

 qnetの逆伝播メソッドbackward()で、各パラメータの勾配を計算します。
 さらに、オプティマイザの更新メソッドupdate()で、各パラメータを更新します。

 勾配が計算されているのを確認します。

# パラメータの勾配を確認
print(qnet.l1.W.grad.shape)
print(qnet.l1.b.grad.shape)
print(qnet.l2.W.grad.shape)
print(qnet.l2.b.grad.shape)
(12, 100)
(100,)
(100, 4)
(4,)


 以上が、ニューラルネットワークを用いたQ学習を行うエージェントの処理です。

パラメータの更新の様子

 ついでに、パラメータの更新の様子をヒートマップで可視化します。メソッドの処理とは無関係です。

 入力層の重みパラメータとその勾配をヒートマップで確認します。ただし、図を分かりやすくするため、パラメータの勾配の値に応じてグラデーションの最小値・最大値を変更しています。

・作図コード(クリックで展開)

# 更新前の入力層の重み
plt.figure(figsize=(12, 2.5))
plt.imshow(old_W1, cmap='bwr', vmin=-1.0, vmax=1.0)
plt.xlabel('hidden size')
plt.ylabel('input size')
plt.suptitle('$W_{in}^{old}$', fontsize=20)
plt.title('state index:'+str(state.argmax()), loc='left')
plt.show()

# 更新後の入力層の重み
plt.figure(figsize=(12, 2.5))
plt.imshow(qnet.l1.W.data, cmap='bwr', vmin=-1.0, vmax=1.0)
plt.xlabel('hidden size')
plt.ylabel('input size')
plt.suptitle('$W_{in}^{new}$', fontsize=20)
plt.title('state index:'+str(state.argmax()), loc='left')
plt.show()

# 入力層の重みの勾配
plt.figure(figsize=(12, 2.5))
plt.imshow(qnet.l1.W.grad.data, cmap='bwr', vmin=-1.0, vmax=1.0)
plt.xlabel('hidden size')
plt.ylabel('input size')
plt.suptitle('$dW_{in}$', fontsize=20)
plt.title('state index:'+str(state.argmax()), loc='left')
plt.show()

# 入力層の重みの更新量
plt.figure(figsize=(12, 2.5))
plt.imshow(qnet.l1.W.data-old_W1, cmap='bwr', vmin=-0.01, vmax=0.01)
plt.xlabel('hidden size')
plt.ylabel('input size')
plt.suptitle('$W_{in}^{new} - W_{in}^{old}$', fontsize=20)
plt.title('state index:'+str(state.argmax()), loc='left')
plt.show()


入力層の重みパラメータに関するヒートマップ

 重みは、標準正規分布に従いランダムに初期化されるので、初期値の段階では不規則に色が付いています。
 one-hotベクトルの入力データと重みの積は、対応する重みを取り出す操作なのでした(7.4.1項)。また、勾配は、順伝播の計算で使われた要素のみが値を持ちます。
 この例では、2番目の要素が1のone-hotベクトル(状態(0, 2))を入力しました。そのため、2行目のみが逆伝播の計算の影響を受けて(勾配が伝播して)います。
 また、勾配の値が0だとパラメータも更新されないので、2行目の値のみが更新されています。

 同様に、入力層のバイアスパラメータとその勾配を確認します。

・作図コード(クリックで展開)

# 更新前の入力層のバイアス
plt.figure(figsize=(12, 1.5))
plt.imshow(old_b1.reshape((1, -1)), cmap='bwr', vmin=-0.01, vmax=0.01)
plt.yticks(ticks=[0])
plt.xlabel('hidden size')
plt.suptitle('$b_{in}^{old}$', fontsize=20)
plt.title('state index:'+str(state.argmax()), loc='left')
plt.show()

# 更新後の入力層のバイアス
plt.figure(figsize=(12, 1.5))
plt.imshow(qnet.l1.b.data.reshape((1, -1)), cmap='bwr', vmin=-0.01, vmax=0.01)
plt.yticks(ticks=[0])
plt.xlabel('hidden size')
plt.suptitle('$b_{in}^{new}$', fontsize=20)
plt.title('state index:'+str(state.argmax()), loc='left')
plt.show()

# 入力層のバイアスの勾配
plt.figure(figsize=(12, 1.5))
plt.imshow(qnet.l1.b.grad.data.reshape((1, -1)), cmap='bwr', vmin=-0.01, vmax=0.01)
plt.yticks(ticks=[0])
plt.xlabel('hidden size')
plt.suptitle('$db_{in}$', fontsize=20)
plt.title('state index:'+str(state.argmax()), loc='left')
plt.show()

# 入力層のバイアスの更新量
plt.figure(figsize=(12, 1.5))
plt.imshow((qnet.l1.b.data-old_b1).reshape((1, -1)), cmap='bwr', vmin=-0.01, vmax=0.01)
plt.yticks(ticks=[0])
plt.xlabel('hidden size')
plt.suptitle('$b_{in}^{new} - b_{in}^{old}$', fontsize=20)
plt.title('state index:'+str(state.argmax()), loc='left')
plt.show()


入力層のバイアスパラメータに関するヒートマップ

 バイアスの初期値は0なので、初期値の段階では全て白色です。
 入力層のバイアスは、入力データの値に関わらず全ての要素が順伝播の計算に使われます。そのため、入力層バイアスの勾配は、全ての要素が値を持ちます。ただし、他のパラメータやその勾配の値によって0(に近い値)になることはあります。
 よって、全ての要素が更新されています。

 出力層の重みパラメータを確認します。

・作図コード(クリックで展開)

# 更新前の出力層の重み
plt.figure(figsize=(4, 10))
plt.imshow(old_W2, cmap='bwr', vmin=-1.0, vmax=1.0)
plt.xticks(ticks=np.arange(4))
plt.xlabel('output size')
plt.ylabel('hidden size')
plt.suptitle('$W_{out}^{old}$', fontsize=20)
plt.title('action:'+str(action), loc='left')
plt.show()

# 更新後の出力層の重み
plt.figure(figsize=(4, 10))
plt.imshow(qnet.l2.W.data, cmap='bwr', vmin=-1.0, vmax=1.0)
plt.xticks(ticks=np.arange(4))
plt.xlabel('output size')
plt.ylabel('hidden size')
plt.suptitle('$W_{out}^{new}$', fontsize=20)
plt.title('action:'+str(action), loc='left')
plt.show()

# 出力層の重みの勾配
plt.figure(figsize=(4, 10))
plt.imshow(qnet.l2.W.grad.data, cmap='bwr', vmin=-1.0, vmax=1.0)
plt.xticks(ticks=np.arange(4))
plt.xlabel('output size')
plt.ylabel('hidden size')
plt.suptitle('$dW_{out}$', fontsize=20)
plt.title('action:'+str(action), loc='left')
plt.show()

# 出力層の重み更新量
plt.figure(figsize=(4, 10))
plt.imshow(qnet.l2.W.data-old_W2, cmap='bwr', vmin=-0.05, vmax=0.05)
plt.xticks(ticks=np.arange(4))
plt.xlabel('output size')
plt.ylabel('hidden size')
plt.suptitle('$W_{out}^{new} - W_{out}^{old}$', fontsize=20)
plt.title('action:'+str(action), loc='left')
plt.show()


出力層のバイアスパラメータに関するヒートマップ

 順伝播の計算において、出力層(ニューラルネットワーク)の計算結果qsからaction番目の要素を取り出して、損失関数(損失層)の計算をしました。そのため、逆伝播の計算もaction番目に関する要素のみに影響(勾配が伝播)します。
 この例では、3番目の行動を取りました。よって、3列目の要素のみが勾配を持ち、更新されています。

 出力層のバイアスパラメータを確認します。

・作図コード(クリックで展開)

# 更新前の出力層のバイアス
plt.figure(figsize=(4, 2))
plt.imshow(old_b2.reshape((1, -1)), cmap='bwr', vmin=-0.1, vmax=0.1)
plt.yticks(ticks=[0])
plt.xlabel('output size')
plt.suptitle('$b_{out}^{old}$', fontsize=20)
plt.title('action:'+str(action), loc='left')
plt.show()

# 更新後の出力層のバイアス
plt.figure(figsize=(4, 2))
plt.imshow(qnet.l2.b.data.reshape((1, -1)), cmap='bwr', vmin=-0.1, vmax=0.1)
plt.yticks(ticks=[0])
plt.xlabel('output size')
plt.suptitle('$b_{out}^{new}$', fontsize=20)
plt.title('action:'+str(action), loc='left')
plt.show()

# 出力層のバイアスの勾配
plt.figure(figsize=(4, 2))
plt.imshow(qnet.l2.b.grad.data.reshape((1, -1)), cmap='bwr', vmin=-0.1, vmax=0.1)
plt.yticks(ticks=[0])
plt.xlabel('output size')
plt.suptitle('$db_{out}$', fontsize=20)
plt.title('action:'+str(action), loc='left')
plt.show()

# 出力層のバイアスの更新量
plt.figure(figsize=(4, 2))
plt.imshow((qnet.l2.b.data-old_b2).reshape((1, -1)), cmap='bwr', vmin=-0.1, vmax=0.1)
plt.yticks(ticks=[0])
plt.xlabel('output size')
plt.suptitle('$b_{out}^{new} - b_{out}^{old}$', fontsize=20)
plt.title('action:'+str(action), loc='left')
plt.show()


出力層のバイアスパラメータに関するヒートマップ

 入力層のバイアスパラメータや出力層の重みパラメータと同様に、action列目の要素のみが逆伝播の影響を受けます。

 さて、話を実装に戻します。

実装

 処理の確認ができたので、ニューラルネットワークを用いたQ学習におけるエージェントをクラスとして実装します。

# NNを用いたQ学習によるエージェントの実装
class QLearningAgent:
    # 初期化メソッドの定義
    def __init__(self):
        # ハイパーパラメータを指定
        self.gamma = 0.9 # 収益の計算用の割引率
        self.lr = 0.01 # 勾配降下法用の学習率
        self.epsilon = 0.1 # ランダムに行動する確率
        self.action_size = 4 # 行動の種類数
        
        # モデルのインスタンスを作成
        self.qnet = QNet() # Q関数(2層のNN)
        self.optimizer = optimizers.SGD(self.lr) # 最適化手法
        self.optimizer.setup(self.qnet) # モデルを設定
    
    # 行動メソッドの定義
    def get_action(self, state):
        # ε-greedy法により行動を決定:式(6.11)
        if np.random.rand() < self.epsilon:
            # ランダムに行動を出力
            return np.random.choice(self.action_size)
        else:
            # 現在の状態の全ての行動価値を計算(順伝播を計算)
            qs = self.qnet(state)
            
            # 行動価値が最大の行動を出力
            return qs.data.argmax()
    
    # 更新メソッドの定義
    def update(self, state, action, reward, next_state, done):
        # 因子型から整数型に変換
        done = int(done)
        
        # 次の状態の全ての行動価値を計算(順伝播を計算)
        next_qs = self.qnet(next_state)
        
        # 行動価値の最大値を取得
        next_q = next_qs.max(axis=1)
        
        # 勾配の計算から除外
        next_q.unchain()
        
        # ターゲット(正解ラベル)を計算
        target = reward + (1 - done) * self.gamma * next_q
        
        # 現在の状態の全ての行動価値を計算(順伝播を計算)
        qs = self.qnet(state)
        
        # 現在の状態・行動の行動価値を取得
        q = qs[:, action]
        
        # 損失関数(損失レイヤの順伝播)を計算
        loss = F.mean_squared_error(target, q)
        
        # 勾配を初期化
        self.qnet.cleargrads()
        
        # 勾配(逆伝播)を計算
        loss.backward()
        
        # 勾配降下法によりNNのパラメータ(Q関数)を更新
        self.optimizer.update()
        
        # 損失を出力
        return loss.data

 (本には書いてませんが記事では、)6.4.3項で実装したQLearningAgentget_action()では、最大値が複数ある場合に最小のインデックスを出力するnp.argmax()ではなく、(5.4.3項で実装した)ランダムにインデックスを出力するargmax()を使いました。
 ニューラルネットワークを用いたQLearningAgentでは、重みがランダムに初期化されるため、基本的に行動価値が同じ値になることはありません。そのため、np.argmax()を使って実装します。

 実装したクラスを試してみましょう。

 環境(グリッドワールド)とエージェントのインスタンスを作成して、1エピソードの処理を行います。

# 環境・エージェントのインスタンスを作成
env = GridWorld()
agent = QLearningAgent()

# 行動の表示用のリストを作成
arrows = ['↑', '↓', '←', '→']
states = [state for state in env.states()]

# 最初の状態を設定
state = env.reset()
done = False

# 最初の状態をone-hotベクトルに変換
state = one_hot(state)

# 平均損失の計算用のオブジェクトを初期化
total_loss, cnt = 0, 0

# 推移の可視化用のリストを初期化
trace_total_loss = []

# 1エピソードのシミュレーション(ゴールに着いたらエピソードを終了)
while not done:
    # ε-greedy法により行動を決定
    action = agent.get_action(state)

    # サンプルデータを取得
    next_state, reward, done = env.step(action)

    # 次の状態をone-hotベクトルに変換
    next_state = one_hot(next_state)

    # Q関数(NNのパラメータ)を更新
    loss = agent.update(state, action, reward, next_state, done)

    # 合計損失を計算
    total_loss += loss

    # 試行回数(時刻)をカウント
    cnt += 1

    # サンプルデータを表示
    print(
        't=' + str(cnt) + 
        ', S_t=' + str(states[state.argmax()]) + 
        ', A_t=' + arrows[action] + 
        ', S_t+1=' + str(states[next_state.argmax()]) + 
        ', R_t=' + str(reward) + 
        ', loss=' + str(np.round(loss, 5))
    )
    
    # 状態を更新
    state = next_state
    
    # 合計損失を記録
    trace_total_loss.append(total_loss)

# 平均損失を計算
average_loss = total_loss / cnt
print('average loss=' + str(average_loss))
t=1, S_t=(2, 0), A_t=→, S_t+1=(2, 1), R_t=0, loss=0.00882
t=2, S_t=(2, 1), A_t=←, S_t+1=(2, 0), R_t=0, loss=0.00108
t=3, S_t=(2, 0), A_t=→, S_t+1=(2, 1), R_t=0, loss=0.00645
t=4, S_t=(2, 1), A_t=←, S_t+1=(2, 0), R_t=0, loss=0.00045
t=5, S_t=(2, 0), A_t=→, S_t+1=(2, 1), R_t=0, loss=0.00481
t=6, S_t=(2, 1), A_t=←, S_t+1=(2, 0), R_t=0, loss=0.00015
t=7, S_t=(2, 0), A_t=→, S_t+1=(2, 1), R_t=0, loss=0.00367
t=8, S_t=(2, 1), A_t=←, S_t+1=(2, 0), R_t=0, loss=2e-05
t=9, S_t=(2, 0), A_t=←, S_t+1=(2, 0), R_t=0, loss=0.05493
t=10, S_t=(2, 0), A_t=→, S_t+1=(2, 1), R_t=0, loss=0.00203
(省略)
t=41, S_t=(0, 2), A_t=↑, S_t+1=(0, 2), R_t=0, loss=1e-05
t=42, S_t=(0, 2), A_t=↑, S_t+1=(0, 2), R_t=0, loss=0.0
t=43, S_t=(0, 2), A_t=↑, S_t+1=(0, 2), R_t=0, loss=0.0
t=44, S_t=(0, 2), A_t=↑, S_t+1=(0, 2), R_t=0, loss=0.0
t=45, S_t=(0, 2), A_t=↑, S_t+1=(0, 2), R_t=0, loss=0.0
t=46, S_t=(0, 2), A_t=↑, S_t+1=(0, 2), R_t=0, loss=0.0
t=47, S_t=(0, 2), A_t=↑, S_t+1=(0, 2), R_t=0, loss=0.0
t=48, S_t=(0, 2), A_t=↑, S_t+1=(0, 2), R_t=0, loss=0.0
t=49, S_t=(0, 2), A_t=→, S_t+1=(0, 3), R_t=1.0, loss=1.12478
average loss=0.035065107255924925

 agentget_action()で挙動方策に従い行動して、envstep()で状態を遷移し報酬を出力します。
 得られたサンプルデータ(one-hotベクトルの現在の状態・行動・報酬・one-hotベクトルの次の状態)を使って、agentupdate()で行動価値関数(ニューラルネットワークのパラメータ)を勾配降下法により更新します。
 ゴールマスに着くとdoneTrueに設定されるので、while文のループ処理が終了します。

 1エピソードでの平均損失の推移をグラフで確認します。

# 平均損失の推移を作図
plt.figure(figsize=(8, 6))
plt.plot(np.arange(1, cnt+1), trace_total_loss/np.arange(1, cnt+1))
plt.xlabel('t')
plt.ylabel('average loss')
plt.suptitle('Mean Squared Error', fontsize=20)
plt.grid()
plt.show()

1エピソードの損失関数の推移


 ヒートマップの作図用に、行動価値関数のディクショナリを作成します。

# 受け皿を作成
Q = {}

# 状態と行動ごとに処理
for state in env.states():
    for action in env.action_space:
        # 状態をone-hotベクトルに変換
        one_hot_state = one_hot(state)
        
        # 全ての行動価値を出力
        qs = agent.qnet(one_hot_state)
        
        # 状態・行動の行動価値を抽出
        q = qs[:, action]
        
        # 値を格納
        Q[state, action] = float(q.data)
print(list(Q.keys())[:6])
print(np.round(list(Q.values())[:6], 2))
[((0, 0), 0), ((0, 0), 1), ((0, 0), 2), ((0, 0), 3), ((0, 1), 0), ((0, 1), 1)]
[-0.22  0.12 -0.08  0.18 -0.25  0.13]

 各状態・行動の行動価値を取り出して、ディクショナリQに格納します。

 リスト内包表記を使って次のようにも処理できます。

# 行動価値関数のディクショナリを作成
Q = {(state, action): float(agent.qnet(one_hot(state))[:, action].data) for state in env.states() for action in env.action_space}
print(list(Q.keys())[:6])
print(np.round(list(Q.values())[:6], 2))
[((0, 0), 0), ((0, 0), 1), ((0, 0), 2), ((0, 0), 3), ((0, 1), 0), ((0, 1), 1)]
[-0.22  0.12 -0.08  0.18 -0.25  0.13]


 行動価値関数をヒートマップで確認します。

# 行動価値関数のヒートマップと方策ラベルを作図
env.render_q(q=Q)

1エピソード更新した行動価値関数のヒートマップと方策ラベル

 TD法によるQ学習(6.4節)の1エピソードの学習後の結果と比較すると、ニューラルネットワークによるQ学習の方は、各マスで赤や緑に色付いています。これはそれぞれ次の理由からです。
 5章や6章では、行動価値関数を0で初期化していたので、1エピソードの学習では報酬のあるマス付近の行動価値しか更新されません(値が変わりません)でした。そのため、白いマス(0のままのマス)が多くなりました。
 ニューラルネットワークを用いた実装では、ニューラルネットワークの重みをランダムな値で初期化するので、試行の度に行動価値が更新され(値が変化し)ます。ただし、これは重みの初期値に依存したもので、このこと自体は学習が早いことを意味しません。
 そのため、未学習の行動価値関数(ニューラルネットワーク)でもランダムに色が付きます。

# エージェントのインスタンスを作成
agent = QLearningAgent()

# 行動価値関数のディクショナリを作成
Q = {(state, action): float(agent.qnet(one_hot(state))[:, action].data) for state in env.states() for action in env.action_space}

# 行動価値関数のヒートマップと方策ラベルを作図
env.render_q(q=Q, print_value=False)

行動価値関数の初期値のヒートマップ


 以上で、2層のニューラルネットワークを用いたQ学習のエージェントを実装できました。

・ニューラルネットワークを用いたQ学習による方策制御

 最後に、ニューラルネットワークと勾配降下法を用いたQ学習により行動価値関数を推定して、更新の推移を確認します。

推定

 ニューラルネットワークを用いたQ学習により行動価値関数を繰り返し更新します。

# 環境・エージェントのインスタンスを作成
env = GridWorld()
agent = QLearningAgent()

# エピソード数を指定
episodes = 1000

# 推移の可視化用のリストを初期化
trace_loss = []
Q = {(state, action): float(agent.qnet(one_hot(state))[:, action].data) for state in env.states() for action in env.action_space}
trace_Q = [Q] # 初期値を記録
trace_W1 = [agent.qnet.l1.W.data.copy()]
trace_b1 = [agent.qnet.l1.b.data.copy()]
trace_W2 = [agent.qnet.l2.W.data.copy()]
trace_b2 = [agent.qnet.l2.b.data.copy()]

# 繰り返しシミュレーション
for episode in range(episodes):
    # 状態を初期化
    state = env.reset()
    done = False
    
    # 最初の状態をone-hotベクトルに変換
    state = one_hot(state)
    
    # 平均損失の計算用のオブジェクトを初期化
    total_loss, cnt = 0, 0
    
    # 1エピソードのシミュレーション(ゴールに着いたらエピソードを終了)
    while not done:
        # ε-greedy法により行動を決定
        action = agent.get_action(state)
        
        # サンプルデータを取得
        next_state, reward, done = env.step(action)
        
        # 次の状態をone-hotベクトルに変換
        next_state = one_hot(next_state)
        
        # Q関数(NNのパラメータ)を更新
        loss = agent.update(state, action, reward, next_state, done)
        
        # 合計損失を計算
        total_loss += loss
        
        # 試行回数をカウント
        cnt += 1
        
        # 状態を更新
        state = next_state
    
    # 平均損失を計算
    average_loss = total_loss / cnt
    
    # 更新値を記録
    trace_loss.append(average_loss)
    Q = {(state, action): float(agent.qnet(one_hot(state))[:, action].data) for state in env.states() for action in env.action_space}
    trace_Q.append(Q)
    trace_W1.append(agent.qnet.l1.W.data.copy())
    trace_b1.append(agent.qnet.l1.b.data.copy())
    trace_W2.append(agent.qnet.l2.W.data.copy())
    trace_b2.append(agent.qnet.l2.b.data.copy())
    
    # 一定回数ごとに平均損失を表示
    if (episode+1) % 100 == 0:
        print('epsiode '+str(episode+1) + ', T='+str(cnt) + ', loss='+str(average_loss))
epsiode 100, T=5, loss=0.0001039496290900388
epsiode 200, T=6, loss=0.0024801804555257454
epsiode 300, T=5, loss=1.443999701677967e-06
epsiode 400, T=5, loss=4.3088687817771644e-08
epsiode 500, T=5, loss=1.938789273481234e-06
epsiode 600, T=5, loss=1.5689485053371754e-07
epsiode 700, T=5, loss=1.6136844749325973e-09
epsiode 800, T=5, loss=1.662957978965096e-06
epsiode 900, T=5, loss=1.332279850885243e-07
epsiode 1000, T=5, loss=7.149915316517763e-06

 スタートマスからε-greedy法により行動し、ゴールマスに着くまでを1エピソードとします。エピソードごとに、GridWorldクラスのreset()メソッドで状態を初期化し(エージェントをスタートマスに戻し)、ゴールフラグdoneFalseに設定します。
 episodesに指定した回数のシミュレーションを行い、時刻ごとに繰り返しagentupdate()でニューラルネットワークのパラメータを勾配降下法により更新します。行動価値関数の値の抽出処理については前項を参照してください。

 推移の確認用に、損失関数や行動価値関数、各パラメータの更新値をそれぞれtrace_*に格納していきます。

 平均損失の推移をグラフで確認します。

# 平均損失の推移を作図
plt.figure(figsize=(8, 6), facecolor='white')
plt.plot(np.arange(episodes), trace_loss)
plt.xlabel('epsiode')
plt.ylabel('average loss')
plt.suptitle('Mean Squared Error', fontsize=20)
plt.grid()
plt.show()

損失関数の推移

 結果の解釈については本を参照してください。

 推定した行動価値関数をヒートマップと方策ラベルで確認します。

# 行動価値関数のヒートマップを作成
env.render_q(q=Q)

行動価値関数のヒートマップと方策ラベル

 結果の解釈については本を参照してください。

更新推移の可視化

 ここまでで、繰り返しの更新処理を確認しました。続いて、途中経過をアニメーションで確認します。

 行動価値関数のヒートマップのアニメーションを作成します。

・作図コード(クリックで展開)

# グリッドマップのサイズを取得
xs = env.width
ys = env.height

# 状態価値の最大値・最小値を取得
qmax = max([max(trace_Q[i].values()) for i in range(len(trace_Q))])
qmin = min([min(trace_Q[i].values()) for i in range(len(trace_Q))])

# 色付け用に最大値・最小値を再設定
qmax = max(qmax, abs(qmin))
qmin = -1 * qmax
qmax = 1 if qmax < 1 else qmax
qmin = -1 if qmin > -1 else qmin

# カラーマップを設定
color_list = ['red', 'white', 'green']
cmap = LinearSegmentedColormap.from_list('colormap_name', color_list)

# 図を初期化
fig = plt.figure(figsize=(10, 7.5), facecolor='white') # 図の設定
plt.suptitle('Q-Learning with Neural Network', fontsize=20) # 全体のタイトル

# 作図処理を関数として定義
def update(i):
    # 前フレームのグラフを初期化
    plt.cla()
    
    # i回目の更新値を取得
    Q = trace_Q[i]
    
    # マス(状態)ごとに処理
    for state in env.states():
        # 行動ごとに処理
        for action in env.action_space:
            # インデックスを取得
            y, x = state
            
            # 報酬を抽出
            r = env.reward_map[y, x]
            
            # 報酬がある場合
            if r != 0 and r is not None:
                # 報酬ラベル用の文字列を作成
                txt = 'R ' + str(r)

                # ゴールの場合
                if state == env.goal_state:
                    # 報酬ラベルにゴールを追加
                    txt = txt + ' (GOAL)'

                # 報酬ラベルを描画
                plt.text(x=x+0.1, y=ys-y-0.9, s=txt, 
                         ha='left', va='bottom', fontsize=15)
            
            # ゴールの場合
            if state == env.goal_state:
                # 描画せず次の状態へ
                continue

            # 作図用のx軸・y軸の値を設定
            tx, ty = x, ys-y-1

            # 行動ごとの三角形の頂点を設定
            action_map = {
                0: ((0.5+tx, 0.5+ty), (1.0+tx, 1.0+ty), (tx, 1.0+ty)), # 上
                1: ((tx, ty), (1.0+tx, ty), (0.5+tx, 0.5+ty)), # 下
                2: ((tx, ty), (0.5+tx, 0.5+ty), (tx, 1.0+ty)), # 左
                3: ((0.5+tx, 0.5+ty), (1.0+tx, ty), (1.0+tx, 1.0+ty)) # 右
            }

            # 行動ごとの価値ラベルのプロット位置を設定
            offset_map = {
                0: (0.5, 0.75), # 上
                1: (0.5, 0.25), # 下
                2: (0.25, 0.5), # 左
                3: (0.75, 0.5)  # 右
            }

            # 壁の場合
            if state == env.wall_state:
                # 壁を描画
                rect = plt.Rectangle(xy=(tx, ty), width=1, height=1, 
                                     fc=(0.4, 0.4, 0.4, 1.0)) # 長方形を作成
                plt.gca().add_patch(rect) # 重ねて描画
            # (よく分からない)
            elif state in env.goal_state:
                plt.gca().add_patch(plt.Rectangle(xy=(tx, ty), width=1, height=1, fc=(0.0, 1.0, 0.0, 1.0)))
            # 壁以外の場合
            else:
                # 行動価値を抽出
                tq = Q[(state, action)]

                # 行動価値を0から1に正規化
                color_scale = 0.5 + (tq / qmax) / 2

                # 三角形を描画
                poly = plt.Polygon(action_map[action],fc=cmap(color_scale)) # 三角形を作成
                plt.gca().add_patch(poly) # 重ねて描画

                # プロット位置の調整値を取得
                offset = offset_map[action]

                # 行動価値ラベルを描画
                plt.text(x=tx+offset[0], y=ty+offset[1], s=str(np.round(tq, 3)), 
                         ha='center', va='center', size=15) # 行動価値ラベル
    
    # グラフの設定
    plt.xticks(ticks=np.arange(xs)) # x軸の目盛位置
    plt.yticks(ticks=np.arange(ys), labels=ys-np.arange(ys)-1) # y軸の目盛位置
    plt.xlim(xmin=0, xmax=xs) # x軸の範囲
    plt.ylim(ymin=0, ymax=ys) # y軸の範囲
    plt.tick_params(labelbottom=False, labelleft=False, labelright=False, labeltop=False) # 軸ラベル
    plt.grid() # グリッド線
    plt.title('episode:'+str(i), loc='left') # タイトル

# gif画像を作成
anime = FuncAnimation(fig=fig, func=update, frames=len(trace_Q), interval=50)

# gif画像を保存
anime.save('QLearning.gif')

 各エピソードで更新した行動価値をtrace_Qから取り出してヒートマップを描画する処理を関数update()として定義して、FuncAnimation()でアニメーション(gif画像)を作成します。


行動価値関数のヒートマップの推移

 行動価値関数の更新値の推移を折れ線グラフで確認します。

・作図コード(クリックで展開)

# 行動ラベルを設定
arrows = ['↑', '↓', '←', '→']

# 状態価値関数の推移を作図
plt.figure(figsize=(15, 10), facecolor='white')
for state in env.states():
    for action in range(agent.action_size):
        # 更新値を抽出
        q_vals = [trace_Q[i][(state, action)] for i in range(episodes+1)]
        
        # 各状態の価値の推移を描画
        plt.plot(np.arange(episodes+1), q_vals, 
                 alpha=0.5, label='$Q_i(L_{'+str(state[0])+','+str(state[1])+'},'+arrows[action]+')$')
plt.xlabel('episode')
plt.ylabel('action-value')
plt.suptitle('Q-Learning with Neural Network', fontsize=20)
plt.title('$\gamma='+str(agent.gamma)+'$', loc='left')
plt.grid()
plt.legend(loc='upper left', bbox_to_anchor=(1, 1), ncol=2)
plt.show()


行動価値関数の推移

 行番号を$h$、列番号を$w$として各マスを$L_{h,w}$で表します(図4-9)。また、$i$回目の行動価値を$Q_i(L_{h,w}, A)$で表します。
 各曲線の縦軸の値が、ヒートマップの色に対応します。

 各パラメータの更新値の推移をグラフで確認します。

・作図コード(クリックで展開)

# 入力層の重みパラメータの推移を作図
plt.figure(figsize=(12, 9), facecolor='white')
for s in range(agent.qnet.l1.W.data.shape[0]):
    for h in range(agent.qnet.l1.W.data.shape[1]):
        w_vals = [trace_W1[i][s, h] for i in range(episodes+1)]
        plt.plot(np.arange(episodes+1), w_vals, alpha=0.5)
plt.xlabel('epsiode')
plt.ylabel('value')
plt.suptitle('input layer weight parameters', fontsize=20)
plt.grid()
plt.show()
# 入力層の重みパラメータの推移を作図
plt.figure(figsize=(12, 9), facecolor='white')
for h in range(agent.qnet.l1.b.data.shape[0]):
    b_vals = [trace_b1[i][h] for i in range(episodes+1)]
    plt.plot(np.arange(episodes+1), b_vals, alpha=0.5)
plt.xlabel('epsiode')
plt.ylabel('value')
plt.suptitle('input layer bias parameters', fontsize=20)
plt.grid()
plt.show()
# 出力層の重みパラメータの推移を作図
plt.figure(figsize=(12, 9), facecolor='white')
for h in range(agent.qnet.l2.W.data.shape[0]):
    for a in range(agent.qnet.l2.W.data.shape[1]):
        w_vals = [trace_W1[i][s, h] for i in range(episodes+1)]
        plt.plot(np.arange(episodes+1), w_vals, alpha=0.5)
plt.xlabel('epsiode')
plt.ylabel('value')
plt.suptitle('output layer weight parameters', fontsize=20)
plt.grid()
plt.show()
# 出力層の重みパラメータの推移を作図
plt.figure(figsize=(12, 9), facecolor='white')
for a in range(agent.qnet.l2.b.data.shape[0]):
    b_vals = [trace_b2[i][a] for i in range(episodes+1)]
    plt.plot(np.arange(episodes+1), b_vals, alpha=1.0)
plt.xlabel('epsiode')
plt.ylabel('value')
plt.suptitle('output layer bias parameters', fontsize=20)
plt.grid()
plt.show()


入力層のパラメータの推移

出力層のパラメータの推移

 (重みの方は更新していないように見えますが、よく見ると始めの頃に少し変化しています。)

 この節では、Q学習を2層のニューラルネットワークを用いて実装しました。またこの章では、ニューラルネットワークの学習を確認しました。次章では、より本格的な強化学習におけるニューラルネットワークを実装します。

参考文献


おわりに

 7章終了です。あれもこれもと色々追加してしまって文字数が増えてしまいましたが、頑張って読んでください。ちなみに、コードや出力にLaTeXコマンドも含めてだけど5万文字弱です。

 特に理由はないですが、そうだ!KICKBACKを聴きましょうか。

 努力 未来 A BEAUTIFUL STAR!

【次節の内容】

www.anarchive-beta.com