からっぽのしょこ

読んだら書く!書いたら読む!読書読読書読書♪同じ事は二度調べ(たく)ない

4.2.6-7:Negative Samplingの実装【ゼロつく2のノート(実装)】

はじめに

 『ゼロから作るDeep Learning 2――自然言語処理編』の初学者向け【実装】攻略ノートです。『ゼロつく2』学習の補助となるように適宜解説を加えています。本と一緒に読んでください。

 本の内容を1つずつ確認しながらゆっくりと組んでいきます。

 この記事は、 4.2.6項「Negative Sampling」と4.2.7項「Negative Samplingの実装」の内容です。負例をサンプリングして損失に含めるNegative Sampling Lossレイヤを説明して、Pythonで実装します。

【前節の内容】

www.anarchive-beta.com

【他の節の内容】

www.anarchive-beta.com

【この節の内容】

4.2.6 Negative Samplingのサンプリング手法

 出力層にEmbedding Dotレイヤを用いることで、「正解の単語」のみを取り出して二値分類として推論するのでした。そのため、「正解の単語」に関する重みだけが学習を行うことになります。そこで、いくつかの「不正解の単語」をランダムに取り出して推論することにします。
 この「不正解の単語(負例)」をサンプリングすることをNegative Samplingと呼びます。

 この項では、Negative Samplingで用いるサンプラー(Unigram Sampler)を実装します。

  各単語のサンプリング確率は、コーパス内での出現頻度に応じて決めることにします。これは、コーパスによく出現する単語ほど重要度が高いと考え、また需要度に応じて学習を行う頻度を増やすためです。そこでコーパス全体における各単語の出現回数の割合を、その単語のサンプリング確率とします。

# 利用するライブラリをインポート
import numpy as np
import collections


・処理の確認

 各単語の出現割合を求めるために、まずは各単語の出現回数を求めます。

 collectionsライブラリのCounterクラスを利用して、各単語の出現回数をカウントします。
 まずはfor文を使って、4.1.2項で作成したコーパスcorpusから1語ずつ単語IDを取り出します。そして単語IDをCounterクラスのインスタンスcountsの添字として、その単語IDの要素に1を加算していくことでカウントします。

# 受け皿(インスタンス)を作成
counts = collections.Counter()

# 1語ずつカウント
for word_id in corpus:
    counts[word_id] += 1

print(counts)

# 語彙数を取得
vocab_size = len(counts)
print(vocab_size)
Counter({1: 2, 0: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1})
7

 語彙数は当然len(word_to_id)と同じ結果になります。

 次に、各単語の出現回数を確率に変換します。

 語彙数と同じ要素数のNumPy配列を作成して、そこに各単語の出現回数をループ処理で移します。

 各単語の出現回数の割合を確率とすると言いましたが、0.75乗して調整した値を確率として用いることにします。累乗の計算には、np.power()を使います。
 調整した各要素の値を、全体の和で割ることで割合を求められます。

# NumPy配列のカウントを作成
word_p = np.zeros(vocab_size)
for i in range(vocab_size):
    word_p[i] = counts[i]
print(word_p)

# 調整係数を指定
power = 0.75

# 値を調整:式(4.4)の分子
word_p = np.power(word_p, power)
print(np.round(word_p, 2))

# 確率に変換(正規化):式(4.4)
word_p /= np.sum(word_p)
print(np.round(word_p, 2))
print(np.sum(word_p))
[1. 2. 1. 1. 1. 1. 1.]
[1.   1.68 1.   1.   1.   1.   1.  ]
[0.13 0.22 0.13 0.13 0.13 0.13 0.13]
1.0000000000000002

 厳密には、式(4.4)を次のように変換した最後の行の計算をしています。

$$ \begin{align} P'(w_i) &= \frac{P(w_i)^{0.75}}{\sum_{i=0}^6 P(w_i)^{0.75}} \tag{4.4}\\ &= \frac{ \frac{C(w_i)^{0.75}}{N^{0.75}} }{ \sum_{i=0}^6 \frac{C(w_i)^{0.75}}{N^{0.75}} } \\ &= \frac{C(w_i)^{0.75}}{N^{0.75}} \sum_{i=0}^6 \frac{N^{0.75}}{C(w_i)^{0.75}} \\ &= \frac{C(w_i)^{0.75}}{N^{0.75}} N^{0.75} \frac{1}{\sum_{i=0}^6 C(w_i)^{0.75}} \\ &= \frac{C(w_i)^{0.75}}{\sum_{i=0}^6 C(w_i)^{0.75}} \end{align} $$

 ここで$C(w_i)$は単語$w_i$の出現回数で、$N = \sum_{i=0}^6 C(w_i)$は総単語数です。また$P(w_i) = \frac{C(w_i)}{N}$は単語IDが$i$の単語の確率で、$P'(w_i)$は調整後の確率です。

 各単語のサンプリング確率が得られたので、サンプリングを行います。

 1回のサンプリングにおいて抽出する単語数をサンプルサイズsample_size、サンプリングをおこなう回数をバッチサイズbatch_sizeとします。バッチサイズは入力データの数(ターゲット数)です。

 サンプリングした単語(単語ID)の受け皿をnegative_sampleとします。negative_sampleの形状は、batch_sizesample_size列の2次元配列です。
 サンプリングには、np.random.choice()を使います。

 Negative Samplingでは「不正解の単語」をサンプリングするのでした。つまり「正解の単語(ターゲットの単語)」はサンプリングしません。そのため、ターゲットの単語のサンプリング確率を0に置き換える必要があります。また置き換えた後に、全体の和が1になるように正規化する必要があります。

# サンプルサイズを指定
sample_size = 3

# バッチサイズ(ターゲット数)を取得
batch_size = target.shape[0]

# サンプリング
negative_sample = np.zeros((batch_size, sample_size), dtype=np.int32) # サンプルの受け皿を初期化
for i in range(batch_size):
    # 各単語のサンプリング確率を取得
    p = word_p.copy()
    
    # ターゲット自体の単語の確率を0にする
    target_idx = target[i] # ターゲットのインデックスを取得
    p[target_idx] = 0 # ターゲットの単語の確率を0にする
    p /= p.sum() # 再正規化
    
    # サンプリング
    negative_sample[i, :] = np.random.choice(vocab_size, size=sample_size, replace=False, p=p)

print(negative_sample)
print(target)
[[0 3 6]
 [0 6 1]
 [5 1 0]
 [6 5 1]
 [3 6 4]
 [6 0 1]]
[1 2 3 4 1 5]

 この出力がサンプリングした単語のIDになります。行ごとに単語IDが重複せず、またターゲット自体の単語IDも含まず単語IDが生成されます。

・実装

 処理の確認ができたので、Unigram Samplerをクラスとして実装します。

# 利用するライブラリをインポート
import collections

# 負例のサンプラーの実装
class UnigramSampler:
    
    # 初期化メソッドの定義
    def __init__(self, corpus, power, sample_size):
        self.sample_size = sample_size # サンプリングする単語数
        self.vocab_size = None # 語彙数
        self.word_p = None # 単語ごとのサンプリング確率
        
        # 出現回数をカウント
        counts = collections.Counter() # 受け皿を初期化
        for word_id in corpus:
            counts[word_id] += 1
        
        # 語彙数を保存
        vocab_size = len(counts)
        self.vocab_size = vocab_size
        
        # 出現回数からサンプリング確率を計算
        self.word_p = np.zeros(vocab_size)
        for i in range(vocab_size):
            self.word_p[i] = counts[i] # カウントを移す
        self.word_p = np.power(self.word_p, power) # 値を調整:式(4.4)の分子
        self.word_p /= np.sum(self.word_p) # 確率に変換(割合を計算):式(4.4)
    
    # サンプリングメソッドの定義
    def get_negative_sample(self, target):
        # バッチサイズ(ターゲット数)を取得
        batch_size = target.shape[0]

        # サンプリング
        negative_sample = np.zeros((batch_size, self.sample_size), dtype=np.int32) # 受け皿を初期化
        for i in range(batch_size):
            # 単語ごとのサンプリング確率を取得
            p = self.word_p.copy()
            
            # ターゲット自体の単語の確率を0にする
            target_idx = target[i] # ターゲットのインデックスを取得
            p[target_idx] = 0 # ターゲットの単語の確率を0にする
            p /= p.sum() # 再正規化
            
            # サンプリング
            negative_sample[i, :] = np.random.choice(self.vocab_size, size=self.sample_size, replace=False, p=p)
        
        return negative_sample


 実装したクラスを試してみましょう。

# サンプルサイズを指定
sample_size = 3

# インスタンスを作成
sampler = UnigramSampler(corpus, power=0.75, sample_size=sample_size)

# サンプリング
negative_sample = sampler.get_negative_sample(target)
print(negative_sample)
print(target)
[[5 0 2]
 [4 5 0]
 [5 0 4]
 [3 1 6]
 [2 4 5]
 [4 1 6]]
[1 2 3 4 1 5]

 行ごとに単語IDが重複せず、またターゲット自体の単語IDも含まず単語IDが生成されれば実装完了です。

 以上でサンプラーを実装できました。次はこのサンプラーを用いてNegative Samplingを実装します。

4.2.7 Negative Samplingの実装

 Negative Sampling、出力層、損失層の処理を行うクラスを実装します。出力層(Embedding Dotレイヤ)は4.2.4項損失層(Sigmoid with Lossレイヤ)は4.2.3項で実装しました。

・処理の確認

 リスト内包表記を使って、サンプルサイズ個のインスタンスを格納するリストを作成します。リスト内でfor文を使うことで、インスタンスの作成をサンプルサイズ回行いますが、そのときrange()から出力される値自体は利用しません。

# サンプルサイズを指定
sample_size = 3

# サンプラーのインスタンスを作成
sampler = UnigramSampler(corpus=corpus, power=0.75, sample_size=sample_size)

# 出力層の重みをランダムに生成
W_out = np.random.randn(vocab_size, hidden_size)
print(W_out.shape)

# Embed Dotレイヤのインスタンスを作成
embed_dot_layers = [EmbeddingDot(W_out) for _ in range(sample_size + 1)]

# Sigmoid with Lossレイヤのインスタンスを作成
loss_layers = [SigmoidWithLoss() for _ in range(sample_size + 1)]
(7, 3)


 「正解の単語(正例)」の損失を求めます。正例の処理については、4.2.3項を参照してください。

 正例の処理には、リストに格納されている0番目のインスタンスを使います。

## 正例の損失を計算

# スコアを計算
score = embed_dot_layers[0].forward(h, target)
print(score)

# 正解ラベルを作成
correct_label = np.ones(batch_size, dtype=np.int32)

# 損失を計算
loss = loss_layers[0].forward(score, correct_label)
print(loss)
[-0.71901268  0.48142814  0.51956035  1.20755051 -0.90154502  0.66974349]
0.6634857919052505

 この損失lossに、負例の損失を加えます。

 続いて、サンプリングした「不正解の単語(負例)」の損失を計算し、lossに加算していきます。

 正例の処理において、正解ラベルが全て1の教師ラベルcorrect_labelを作成しました。不正解のラベル(インデックス)は0です。そこで全ての要素が0の配列を作成して、それを不正解のインデックスを示す教師ラベルnegative_labelとします。

 負例の処理では、リストの1番目からのインスタンスを使ってスコアと損失を計算します。

## 負例の損失を加算

# 負例をサンプリング
negative_sample = sampler.get_negative_sample(target)

# 負正解ラベルを作成
negative_label = np.zeros(batch_size, dtype=np.int32)

# サンプルごとに損失を加算
for i in range(sample_size):
    
    # 負例のサンプルを取得
    negative_target = negative_sample[:, i]
    
    # スコアを計算
    score = embed_dot_layers[1 + i].forward(h, negative_target)
    
    # 損失を加算
    loss += loss_layers[1 + i].forward(score, negative_label)
    
    # 処理の確認用に各サンプルの損失を表示
    print(loss)
1.2956572166862368
2.3094703754653034
3.236049914289778

 負例の処理では、サンプリングされた単語がターゲットの単語である確率$y_{nk}$と、サンプリングされた単語に対応する教師ラベルの要素$t_{nk} = 0$との交差エントロピー誤差を求めています。正解ではない単語に関する推論結果なので、$y_{nk}$は0に近いほどよく推論できていることになります。つまり$y_{nk}$の値が0に近づくほど、損失も小さくなります。

 ここまでが、順伝播で行う処理になります。

 続いて、逆伝播で行う処理を確認します。

 逆伝播の入力dLは$\frac{\partial L}{\partial L} = 1$です。

 dLは、各サンプルのSigmoid with Lossレイヤに伝播し、各サンプルのスコア$\mathbf{s}$に関する勾配$\frac{\partial L}{\partial \mathbf{s}}$(dscore)を計算します(4.2.3項)。

 dscoreは、各サンプルのEmbedding Dotレイヤに伝播し、サンプルごとに中間層のニューロン$\mathbf{h}$に関する勾配$\frac{\partial L}{\partial \mathbf{h}} = \frac{\partial L}{\partial \mathbf{s}} \frac{\partial \mathbf{s}}{\partial \mathbf{h}}$(dh)を計算します(4.2.2項)。

 図4-17からも分かる通り、$\mathbf{h}$は各サンプルのEmbedding Dotレイヤに(順)伝播しています。つまりRepeatノード(1.3.4.3項)の働きをしています。よってRepeatの逆伝播と同様に、全てのサンプルの勾配dhの和をとります。

 各レイヤの勾配計算は、各レイヤのクラスの逆伝播メソッドで行えます。

# 順伝播の初期値
dL = 1

# 勾配を初期化
dh = 0

# サンプルごとに逆伝播メソッドを実行
for layer0, layer1 in zip(loss_layers, embed_dot_layers):
    # スコアに関する勾配を計算
    dscore = layer0.backward(dL)
    
    # 中間層のニューロンに関する勾配を加算
    dh += layer1.backward(dscore)
    
    # 処理の確認用に各サンプルの勾配を表示
    print(np.round(dh, 2))
[[-0.07  0.15 -0.01]
 [ 0.02  0.04 -0.02]
 [-0.01  0.07  0.13]
 [ 0.02  0.06 -0.03]
 [-0.07  0.16 -0.01]
 [ 0.07 -0.04  0.  ]]
[[-0.07  0.14 -0.  ]
 [-0.01 -0.05 -0.12]
 [-0.1   0.12  0.13]
 [ 0.05  0.06 -0.05]
 [-0.05  0.03 -0.23]
 [ 0.11 -0.13  0.01]]
[[-0.03  0.13 -0.03]
 [ 0.07 -0.22 -0.11]
 [-0.07  0.05  0.13]
 [ 0.13 -0.12 -0.05]
 [-0.19  0.11 -0.24]
 [ 0.12 -0.2  -0.12]]
[[-0.01 -0.03 -0.34]
 [ 0.09 -0.33 -0.3 ]
 [-0.09 -0.01  0.07]
 [ 0.1  -0.21 -0.14]
 [-0.21  0.05 -0.21]
 [ 0.06 -0.35 -0.05]]

 計算結果のdhは、Embeddingレイヤ(4.1.1項)に伝播します。

・実装

 処理の確認ができたので、Negative Samplingを行いEmbedding Dotレイヤ(出力層)とSigmoid with Lossレイヤ(損失層)の計算を行う処理をクラスとして実装します。

# サンプリングと損失計算の実装
class NegativeSamplingLoss:
    
    # 初期化メソッドの定義
    def __init__(self, W, corpus, power=0.75, sample_size=5):
        self.sample_size = sample_size # サンプリングサイズ
        self.sampler = UnigramSampler(corpus, power, sample_size)  # サンプラー
        self.loss_layers = [SigmoidWithLoss() for _ in range(sample_size + 1)]    # 損失レイヤ
        self.embed_dot_layers = [EmbeddingDot(W) for _ in range(sample_size + 1)] # Embed Dotレイヤ
        
        # 各レイヤのパラメータと勾配を格納
        self.params = [] # パラメータ
        self.grads = []  # 勾配
        for layer in self.embed_dot_layers:
            self.params += layer.params
            self.grads += layer.grads
    
    # 順伝播メソッドの定義
    def forward(self, h, target):
        # バッチサイズ(ターゲット数)を取得
        batch_size = target.shape[0]
        
        # サンプリング
        negative_sample = self.sampler.get_negative_sample(target)
        
        # 正例の損失を計算
        score = self.embed_dot_layers[0].forward(h, target) # スコア
        correct_label = np.ones(batch_size, dtype=np.int32) # 教師ラベル
        loss = self.loss_layers[0].forward(score, correct_label) # 損失
        
        # 負例の損失を加算
        negative_label = np.zeros(batch_size, dtype=np.int32) # 教師ラベル
        for i in range(self.sample_size):
            negative_target = negative_sample[:, i] # 負例のサンプル
            score = self.embed_dot_layers[1 + i].forward(h, negative_target) # スコア
            loss += self.loss_layers[1 + i].forward(score, negative_label)   # 損失
        
        return loss
    
    # 逆伝播メソッドの定義
    def backward(self, dout=1):
        # 勾配を初期化
        dh = 0
        
        # 順番に逆伝播メソッドを実行
        for l0, l1 in zip(self.loss_layers, self.embed_dot_layers):
            dscore = l0.backward(dout) # スコア
            dh += l1.backward(dscore)  # 中間層のニューロン
        
        return dh


 逆伝播メソッドにおいてzipを使います。zip()を使うことで、複数のリストを同時にfor文で扱えます。

# リストを作成
menber_names = ['yuka', 'tomoko', 'sayuki', 'karin', 'akari']
menber_colors = ['peach', 'apple', 'lemon', 'grape', 'melon']

# それぞれのリストから要素を1つずつ出力
for name, color in zip(menber_names, menber_colors):
    print(name + ':' + color)
yuka:peach
tomoko:apple
sayuki:lemon
karin:grape
akari:melon


 実装したクラスを試してみましょう。

 インスタンスを作成して、順伝播の処理を実行します。

# サンプルサイズを指定
sample_size = 3

# インスタンスを作成
ns_loss_layer = NegativeSamplingLoss(W_out, corpus, power=0.75, sample_size=sample_size)

# 順伝播メソッドの処理
loss = ns_loss_layer.forward(h, target)
print(loss)
2.8696459592003283

 負例も含めた損失を計算できました。

 続いて逆伝播の処理を実行します。

# 逆伝播の入力
dout = 1

# 逆伝播メソッドの処理
dh = ns_loss_layer.backward(dout)
print(dh)
print(dh.shape)
[[-0.0081478  -0.03470311 -0.33672272]
 [ 0.14428701 -0.23622157 -0.2265689 ]
 [-0.00782409 -0.07928057  0.07738105]
 [-0.09233067 -0.0741812  -0.08484709]
 [-0.04418219 -0.02933486 -0.22480874]
 [ 0.09612305 -0.27821205 -0.20346179]]
 (6, 3)


 ここまでで改良版CBOWモデルに必要なレイヤを全て作成できました。次節では改良版のCBOWモデルを実装します。

参考文献

  • 斎藤康毅『ゼロから作るDeep Learning 2――自然言語処理編』オライリー・ジャパン,2018年.

おわりに

 このシリーズは主に他人のために作り始めた資料だけど、具体的に自分の役に立ち始めたので、まぁやってて良かったなと思いました。最近進捗が悪くなってきてるけど、、

 10月10日は、Juice=Juiceの日!

 イベント行きたーい。ライブ行きたーい。

 ハロプロ記念日駆動執筆になりつつある。

【次節の内容】