RNN によるテキスト生成

このチュートリアルでは、文字ベースの RNN を使ってテキストを生成する方法を示します。ここでは、Andrej Karpathy の The Unreasonable Effectiveness of Recurrent Neural Networks からのシェイクスピア作品のデータセットを使います。このデータからの文字列("Shakespear")を入力にして、文字列中の次の文字("e")を予測するモデルを訓練します。このモデルを繰り返し呼び出すことで、より長い文字列を生成することができます。

このチュートリアルには、tf.keraseager execution を使ったコードが含まれています。下記は、このチュートリアルのモデルを 30 エポック訓練したものに対して、文字列 "Q" を初期値とした場合の出力例です。

I had thought thou hadst a Roman; for the oracle,
Thus by All bids the man against the word,
Which are so weak of care, by old care done;
Your children were in your holy love,
And the precipitation through the bleeding throne.

Marry, and will, my lord, to weep in such a one were prettiest;
Yet now I was adopted heir
Of the world's lamentable day,
To watch the next way with his father with his face?

The cause why then we are all resolved more sons.

O, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, it is no sin it should be dead,
And love and pale as any will to that word.

But how long have I heard the soul for this world,
And show his hands of life be proved to stand.

I say he look'd on, if I must be content
To stay him from the fatal of our country's bliss.
His lordship pluck'd from this sentence then for prey,
And then let us twain, being the moon,
were she such a case as fills m


  • このモデルは文字ベースです。訓練が始まった時に、モデルは英語の単語のスペルも知りませんし、単語がテキストの単位であることも知らないのです。

  • 出力の構造は戯曲に似ています。だいたいのばあい、データセットとおなじ大文字で書かれた話し手の名前で始まっています。

  • 以下に示すように、モデルはテキストの小さなバッチ(各100文字)で訓練されていますが、一貫した構造のより長いテキストのシーケンスを生成できます。


TensorFlow 等のライブラリインポート

import tensorflow as tf

import numpy as np
import os
import time
path_to_file = tf.keras.utils.get_file('shakespeare.txt', 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')



# 読み込んだのち、Python 2 との互換性のためにデコード
text = open(path_to_file, 'rb').read().decode(encoding='utf-8')
# テキストの長さは含まれる文字数
print ('Length of text: {} characters'.format(len(text)))
Length of text: 1115394 characters
# テキストの最初の 250文字を参照
First Citizen:
Before we proceed any further, hear me speak.

Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.
# ファイル中のユニークな文字の数
vocab = sorted(set(text))
print ('{} unique characters'.format(len(vocab)))
65 unique characters




# それぞれの文字からインデックスへの対応表を作成
char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)

text_as_int = np.array([char2idx[c] for c in text])

これで、それぞれの文字を整数で表現できました。文字を、0 からlen(unique) までのインデックスに変換していることに注意してください。

for char,_ in zip(char2idx, range(20)):
    print('  {:4s}: {:3d},'.format(repr(char), char2idx[char]))
print('  ...\n}')
  '\n':   0,
  ' ' :   1,
  '!' :   2,
  '$' :   3,
  '&' :   4,
  "'" :   5,
  ',' :   6,
  '-' :   7,
  '.' :   8,
  '3' :   9,
  ':' :  10,
  ';' :  11,
  '?' :  12,
  'A' :  13,
  'B' :  14,
  'C' :  15,
  'D' :  16,
  'E' :  17,
  'F' :  18,
  'G' :  19,
# テキストの最初の 13 文字がどのように整数に変換されるかを見てみる
print ('{} ---- characters mapped to int ---- > {}'.format(repr(text[:13]), text_as_int[:13]))
'First Citizen' ---- characters mapped to int ---- > [18 47 56 57 58  1 15 47 58 47 64 43 52]



RNN はすでに見た要素に基づく内部状態を保持しているため、この時点までに計算されたすべての文字を考えると、次の文字は何でしょうか?


つぎに、テキストをサンプルシーケンスに分割します。それぞれの入力シーケンスは、元のテキストからの seq_length 個の文字を含みます。


そのため、テキストを seq_length+1 のかたまりに分割します。たとえば、 seq_length が 4 で、テキストが "Hello" だとします。入力シーケンスは "Hell" で、ターゲットシーケンスは "ello" となります。

これを行うために、最初に tf.data.Dataset.from_tensor_slices 関数を使ってテキストベクトルを文字インデックスの連続に変換します。

# ひとつの入力としたいシーケンスの文字数としての最大の長さ
seq_length = 100
examples_per_epoch = len(text)//(seq_length+1)

# 訓練用サンプルとターゲットを作る
char_dataset = tf.data.Dataset.from_tensor_slices(text_as_int)

for i in char_dataset.take(5):

batch メソッドを使うと、個々の文字を求める長さのシーケンスに簡単に変換できます。

sequences = char_dataset.batch(seq_length+1, drop_remainder=True)

for item in sequences.take(5):
'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou '
'are all resolved rather to die than to famish?\n\nAll:\nResolved. resolved.\n\nFirst Citizen:\nFirst, you k'
"now Caius Marcius is chief enemy to the people.\n\nAll:\nWe know't, we know't.\n\nFirst Citizen:\nLet us ki"
"ll him, and we'll have corn at our own price.\nIs't a verdict?\n\nAll:\nNo more talking on't; let it be d"
'one: away, away!\n\nSecond Citizen:\nOne word, good citizens.\n\nFirst Citizen:\nWe are accounted poor citi'

シーケンスそれぞれに対して、map メソッドを使って各バッチに単純な関数を適用することで、複製とシフトを行い、入力テキストとターゲットテキストを生成します。

def split_input_target(chunk):
    input_text = chunk[:-1]
    target_text = chunk[1:]
    return input_text, target_text

dataset = sequences.map(split_input_target)


for input_example, target_example in  dataset.take(1):
  print ('Input data: ', repr(''.join(idx2char[input_example.numpy()])))
  print ('Target data:', repr(''.join(idx2char[target_example.numpy()])))
Input data:  'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou'
Target data: 'irst Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou '

これらのベクトルのインデックスそれぞれが一つのタイムステップとして処理されます。タイムステップ 0 の入力として、モデルは "F" のインデックスを受け取り、次の文字として "i" のインデックスを予測しようとします。次のタイムステップでもおなじことをしますが、RNN は現在の入力文字に加えて、過去のステップのコンテキストも考慮します。

for i, (input_idx, target_idx) in enumerate(zip(input_example[:5], target_example[:5])):
    print("Step {:4d}".format(i))
    print("  input: {} ({:s})".format(input_idx, repr(idx2char[input_idx])))
    print("  expected output: {} ({:s})".format(target_idx, repr(idx2char[target_idx])))
Step    0
  input: 18 ('F')
  expected output: 47 ('i')
Step    1
  input: 47 ('i')
  expected output: 56 ('r')
Step    2
  input: 56 ('r')
  expected output: 57 ('s')
Step    3
  input: 57 ('s')
  expected output: 58 ('t')
Step    4
  input: 58 ('t')
  expected output: 1 (' ')


tf.data を使ってテキストを分割し、扱いやすいシーケンスにします。しかし、このデータをモデルに供給する前に、データをシャッフルしてバッチにまとめる必要があります。

# バッチサイズ

# データセットをシャッフルするためのバッファサイズ
# (TF data は可能性として無限長のシーケンスでも使えるように設計されています。
# このため、シーケンス全体をメモリ内でシャッフルしようとはしません。
# その代わりに、要素をシャッフルするためのバッファを保持しています)

dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)

<BatchDataset element_spec=(TensorSpec(shape=(64, 100), dtype=tf.int64, name=None), TensorSpec(shape=(64, 100), dtype=tf.int64, name=None))>


tf.keras.Sequential を使ってモデルを定義します。この簡単な例では、モデルの定義に3つのレイヤーを使用しています。

  • tf.keras.layers.Embedding: 入力レイヤー。それぞれの文字を表す数を embedding_dim 次元のベクトルに変換する、訓練可能な参照テーブル。
  • tf.keras.layers.GRU: サイズが units=rnn_units のRNNの一種(ここに LSTM レイヤーを使うこともできる)。
  • tf.keras.layers.Dense: vocab_size の出力を持つ、出力レイヤー。
# 文字数で表されるボキャブラリーの長さ
vocab_size = len(vocab)

# 埋め込みベクトルの次元
embedding_dim = 256

# RNN ユニットの数
rnn_units = 1024
def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
  model = tf.keras.Sequential([
    tf.keras.layers.Embedding(vocab_size, embedding_dim,
                              batch_input_shape=[batch_size, None]),
  return model
model = build_model(
  vocab_size = len(vocab),

1文字ごとにモデルは埋め込みベクトルを検索し、その埋め込みベクトルを入力として GRU を 1 タイムステップ実行します。そして Dense レイヤーを適用して、次の文字の対数尤度を予測するロジットを生成します。

最初に、出力の shape を確認します。

for input_example_batch, target_example_batch in dataset.take(1):
  example_batch_predictions = model(input_example_batch)
  print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")
(64, 100, 65) # (batch_size, sequence_length, vocab_size)

上記の例では、入力のシーケンスの長さは 100 ですが、モデルはどのような長さの入力でも実行できます。

Model: "sequential"
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       (64, None, 256)           16640     
 gru (GRU)                   (64, None, 1024)          3938304   
 dense (Dense)               (64, None, 65)            66625     
Total params: 4,021,569
Trainable params: 4,021,569
Non-trainable params: 0



sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices,axis=-1).numpy()


array([29,  4, 59, 43,  5, 52, 39,  4, 29,  6, 21, 50, 32, 36, 16,  5, 60,
        0, 43, 55, 25,  3, 14, 15, 39, 35,  6,  5, 22,  0, 44, 33,  1, 29,
       43, 17,  5, 54, 38,  6, 48, 40, 16, 49, 53, 15, 29, 51,  9,  8, 26,
       51, 60, 12,  6, 35, 12, 18, 52, 52, 45, 54, 13, 42, 12, 15, 19,  6,
       45,  7, 63,  7, 44, 23,  3, 26, 10, 54, 62, 36, 54, 22, 28,  5,  5,
       16, 32, 27, 57, 48,  4, 58,  0, 15, 34, 52, 35, 20, 50, 14])


print("Input: \n", repr("".join(idx2char[input_example_batch[0]])))
print("Next Char Predictions: \n", repr("".join(idx2char[sampled_indices ])))
 'uy and sell so, so give alms,\nPray so; and, for the ordering your affairs,\nTo sing them too: when yo'

Next Char Predictions: 
 "Q&ue'na&Q,IlTXD'v\neqM\\(BCaW,'J\nfU QeE'pZ,jbDkoCQm3.Nmv?,W?FnngpAd?CG,g-y-fK\\)N:pxXpJP''DTOsj&t\nCVnWHlB"


ここまでくれば問題は標準的な分類問題として扱うことができます。これまでの RNN の状態と、いまのタイムステップの入力が与えられ、次の文字のクラスを予測します。


この場合、標準の tf.keras.losses.sparse_categorical_crossentropy 損失関数が使えます。予測の最後の次元に適用されるからです。

このモデルはロジットを返すので、from_logits フラグをセットする必要があります。

def loss(labels, logits):
  return tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)

example_batch_loss  = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("scalar_loss:      ", example_batch_loss.numpy().mean())
Prediction shape:  (64, 100, 65)  # (batch_size, sequence_length, vocab_size)
scalar_loss:       4.173783

tf.keras.Model.compile を使って、訓練手順を定義します。既定の引数を持った tf.keras.optimizers.Adam と、先ほどの loss 関数を使用しましょう。

model.compile(optimizer='adam', loss=loss)


tf.keras.callbacks.ModelCheckpoint を使って、訓練中にチェックポイントを保存するようにします。

# チェックポイントが保存されるディレクトリ
checkpoint_dir = './training_checkpoints'
# チェックポイントファイルの名称
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")



訓練時間を適切に保つために、10エポックを使用してモデルを訓練します。Google Colab を使用する場合には、訓練を高速化するためにランタイムを GPU に設定します。

history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])
Epoch 1/10
172/172 [==============================] - 10s 42ms/step - loss: 2.6739
Epoch 2/10
172/172 [==============================] - 8s 38ms/step - loss: 1.9617
Epoch 3/10
172/172 [==============================] - 8s 38ms/step - loss: 1.6931
Epoch 4/10
172/172 [==============================] - 8s 38ms/step - loss: 1.5456
Epoch 5/10
172/172 [==============================] - 8s 38ms/step - loss: 1.4564
Epoch 6/10
172/172 [==============================] - 8s 38ms/step - loss: 1.3969
Epoch 7/10
172/172 [==============================] - 8s 38ms/step - loss: 1.3516
Epoch 8/10
172/172 [==============================] - 8s 38ms/step - loss: 1.3123
Epoch 9/10
172/172 [==============================] - 8s 38ms/step - loss: 1.2789
Epoch 10/10
172/172 [==============================] - 8s 38ms/step - loss: 1.2475



予測ステップを単純にするため、バッチサイズ 1 を使用します。

RNN が状態をタイムステップからタイムステップへと渡す仕組みのため、モデルは一度構築されると固定されたバッチサイズしか受け付けられません。

モデルを異なる batch_size で実行するためには、モデルを再構築し、チェックポイントから重みを復元する必要があります。

model = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1)


model.build(tf.TensorShape([1, None]))
Model: "sequential_1"
 Layer (type)                Output Shape              Param #   
 embedding_1 (Embedding)     (1, None, 256)            16640     
 gru_1 (GRU)                 (1, None, 1024)           3938304   
 dense_1 (Dense)             (1, None, 65)             66625     
Total params: 4,021,569
Trainable params: 4,021,569
Non-trainable params: 0



  • 最初に、開始文字列を選択し、RNN の状態を初期化して、生成する文字数を設定します。

  • 開始文字列と RNN の状態を使って、次の文字の予測分布を得ます。

  • つぎに、カテゴリー分布を使用して、予測された文字のインデックスを計算します。この予測された文字をモデルの次の入力にします。

  • モデルによって返された RNN の状態はモデルにフィードバックされるため、1つの文字だけでなく、より多くのコンテキストを持つことになります。つぎの文字を予測した後、更新された RNN の状態が再びモデルにフィードバックされます。こうしてモデルは以前に予測した文字からさらにコンテキストを得ることで学習するのです。

def generate_text(model, start_string):
  # 評価ステップ(学習済みモデルを使ったテキスト生成)

  # 生成する文字数
  num_generate = 1000

  # 開始文字列を数値に変換(ベクトル化)
  input_eval = [char2idx[s] for s in start_string]
  input_eval = tf.expand_dims(input_eval, 0)

  # 結果を保存する空文字列
  text_generated = []

  # 低い temperature は、より予測しやすいテキストをもたらし
  # 高い temperature は、より意外なテキストをもたらす
  # 実験により最適な設定を見つけること
  temperature = 1.0

  # ここではバッチサイズ == 1
  for i in range(num_generate):
      predictions = model(input_eval)
      # バッチの次元を削除
      predictions = tf.squeeze(predictions, 0)

      # カテゴリー分布をつかってモデルから返された文字を予測 
      predictions = predictions / temperature
      predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()

      # 過去の隠れ状態とともに予測された文字をモデルへのつぎの入力として渡す
      input_eval = tf.expand_dims([predicted_id], 0)


  return (start_string + ''.join(text_generated))
print(generate_text(model, start_string=u"ROMEO: "))
ROMEO: I am a miss'd.
But sleep forth. Be,
To chose by gailting than thy daughter's mind
Good queen, my lord.

No, I'll proceed.

Put out an egry cape
Would have the wars friar'dly,
Hath colouris contemplisans.

Ay, the time thou but well, not for't.
Both living fortune waste with it forswore,
Thanks, and all to thine eye,
We shall have, she was rund. Hil away?

Here canst thou never being to use ie fest
beloved o' the court-world and weakness,--

Scorn, fair wetch your good company: masters, call'd up,
As call you beheld, lord.

He should do giving the plishes; these fellows a man,
Nay, let me weep
Than come upon his majesty! By bright-advice they but a fool,
And fetch itsell landu-doom here alive the day Tyrrel,
God knoces him for a very more voices.
We say, a thousand-Face that ever I am about to.

So lie, the lust, the pant murder? is therely earth.

Is, palket chargely forget,
When I go; go, come, or a better d

この結果を改善するもっとも簡単な方法は、もっと長く訓練することです(EPOCHS=30 を試してみましょう)。

また、異なる初期文字列を使ったり、モデルの精度を向上させるためにもうひとつ RNN レイヤーを加えたり、temperature パラメータを調整して、よりランダム性の強い、あるいは、弱い予測を試してみたりすることができます。

上級編: 訓練のカスタマイズ


モデルを手動で実行する方法を見てきたので、訓練ループを展開し、自分で実装してみましょう。このことが、たとえばモデルのオープンループによる出力を安定化するための カリキュラム学習 を実装するための出発点になります。

勾配を追跡するために tf.GradientTape を使用します。このアプローチについての詳細を学ぶには、 eager execution guide をお読みください。


  • 最初に、RNN の状態を初期化する。tf.keras.Model.reset_states メソッドを呼び出すことでこれを実行する。

  • つぎに、(1バッチずつ)データセットを順番に処理し、それぞれのバッチに対する予測値を計算する。

  • tf.GradientTape をオープンし、そのコンテキストで、予測値と損失を計算する。

  • tf.GradientTape.grads メソッドを使って、モデルの変数に対する損失の勾配を計算する。

  • 最後に、オプティマイザの tf.train.Optimizer.apply_gradients メソッドを使って、逆方向の処理を行う。

model = build_model(
  vocab_size = len(vocab),
optimizer = tf.keras.optimizers.Adam()
def train_step(inp, target):
  with tf.GradientTape() as tape:
    predictions = model(inp)
    loss = tf.reduce_mean(
            target, predictions, from_logits=True))
  grads = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(grads, model.trainable_variables))

  return loss
# 訓練ステップ

for epoch in range(EPOCHS):
  start = time.time()

  # 各エポックの最初に、隠れ状態を初期化する
  # 最初は隠れ状態は None
  hidden = model.reset_states()

  for (batch_n, (inp, target)) in enumerate(dataset):
    loss = train_step(inp, target)

    if batch_n % 100 == 0:
      template = 'Epoch {} Batch {} Loss {}'
      print(template.format(epoch+1, batch_n, loss))

  # 5エポックごとにモデル(のチェックポイント)を保存する
  if (epoch + 1) % 5 == 0:

  print ('Epoch {} Loss {:.4f}'.format(epoch+1, loss))
  print ('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

Epoch 1 Batch 0 Loss 4.174540996551514
Epoch 1 Batch 100 Loss 2.370338201522827
Epoch 1 Loss 2.1595
Time taken for 1 epoch 9.26038932800293 sec

Epoch 2 Batch 0 Loss 2.1852879524230957
Epoch 2 Batch 100 Loss 1.9203600883483887
Epoch 2 Loss 1.8065
Time taken for 1 epoch 7.190315246582031 sec

Epoch 3 Batch 0 Loss 1.8183932304382324
Epoch 3 Batch 100 Loss 1.669272780418396
Epoch 3 Loss 1.6125
Time taken for 1 epoch 7.355443239212036 sec

Epoch 4 Batch 0 Loss 1.5592581033706665
Epoch 4 Batch 100 Loss 1.5509053468704224
Epoch 4 Loss 1.5413
Time taken for 1 epoch 7.222464084625244 sec

Epoch 5 Batch 0 Loss 1.4591556787490845
Epoch 5 Batch 100 Loss 1.470933198928833
Epoch 5 Loss 1.4629
Time taken for 1 epoch 7.2957847118377686 sec

Epoch 6 Batch 0 Loss 1.4055699110031128
Epoch 6 Batch 100 Loss 1.3990877866744995
Epoch 6 Loss 1.3728
Time taken for 1 epoch 7.287255764007568 sec

Epoch 7 Batch 0 Loss 1.3341516256332397
Epoch 7 Batch 100 Loss 1.3645275831222534
Epoch 7 Loss 1.3426
Time taken for 1 epoch 7.1413867473602295 sec

Epoch 8 Batch 0 Loss 1.2572031021118164
Epoch 8 Batch 100 Loss 1.3154652118682861
Epoch 8 Loss 1.2930
Time taken for 1 epoch 7.195223808288574 sec

Epoch 9 Batch 0 Loss 1.2512826919555664
Epoch 9 Batch 100 Loss 1.3123900890350342
Epoch 9 Loss 1.3117
Time taken for 1 epoch 7.14519190788269 sec

Epoch 10 Batch 0 Loss 1.1898956298828125
Epoch 10 Batch 100 Loss 1.2471652030944824
Epoch 10 Loss 1.2674
Time taken for 1 epoch 7.228768825531006 sec