ดูบน TensorFlow.org | ทำงานใน Google Colab | ดูแหล่งที่มาบน GitHub | ดาวน์โหลดโน๊ตบุ๊ค |
บทนำ
เมื่อคุณกำลังทำการเรียนรู้ภายใต้การดูแลคุณสามารถใช้ fit()
และทุกอย่างทำงานได้อย่างราบรื่น
เมื่อคุณต้องเขียนห่วงการฝึกอบรมของคุณเองตั้งแต่เริ่มต้นคุณสามารถใช้ GradientTape
และใช้การควบคุมทุกรายละเอียดเล็ก ๆ น้อย ๆ
แต่สิ่งที่ถ้าคุณต้องการขั้นตอนวิธีการฝึกอบรมที่กำหนดเอง แต่คุณยังคงต้องการที่จะได้รับประโยชน์จากการอำนวยความสะดวกของ fit()
เช่นการเรียกกลับตัวในการสนับสนุนการกระจายหรือขั้นตอนการหลอมละลาย?
หลักการหลักของ Keras คือการเปิดเผยความก้าวหน้าของความซับซ้อน คุณควรจะสามารถเข้าสู่เวิร์กโฟลว์ระดับล่างได้แบบค่อยเป็นค่อยไป คุณไม่ควรตกหน้าผาหากฟังก์ชันระดับสูงไม่ตรงกับกรณีการใช้งานของคุณทุกประการ คุณควรจะสามารถควบคุมรายละเอียดเล็ก ๆ น้อย ๆ ได้มากขึ้นในขณะที่ยังคงความสะดวกสบายระดับสูงไว้ในปริมาณที่พอเหมาะ
เมื่อคุณจำเป็นต้องปรับแต่งสิ่งที่ fit()
ไม่คุณควรแทนที่ฟังก์ชั่นขั้นตอนการฝึกอบรมของ Model
คลาส นี้เป็นฟังก์ชั่นที่เรียกได้ว่า fit()
สำหรับชุดของข้อมูลทุก จากนั้นคุณจะสามารถที่จะเรียก fit()
ตามปกติ - และมันจะทำงานขั้นตอนวิธีการเรียนรู้ของคุณเอง
โปรดทราบว่ารูปแบบนี้ไม่ได้ป้องกันคุณจากการสร้างแบบจำลองด้วย Functional API คุณสามารถทำเช่นนี้ไม่ว่าคุณกำลังสร้าง Sequential
รุ่นรุ่น API ฟังก์ชั่นหรือรุ่น subclassed
เรามาดูกันว่ามันทำงานอย่างไร
ติดตั้ง
ต้องใช้ TensorFlow 2.2 หรือใหม่กว่า
import tensorflow as tf
from tensorflow import keras
ตัวอย่างแรกง่ายๆ
เริ่มจากตัวอย่างง่ายๆ:
- เราสร้างคลาสใหม่ที่ subclasses
keras.Model
- เราเพียงแค่แทนที่วิธี
train_step(self, data)
- เราคืนค่าชื่อเมตริกการแมปพจนานุกรม (รวมถึงการสูญเสีย) เป็นค่าปัจจุบัน
อาร์กิวเมนต์การป้อน data
เป็นสิ่งที่ได้รับการส่งผ่านไปพอดีเป็นข้อมูลการฝึกอบรม:
- ถ้าคุณผ่านอาร์เรย์ Numpy โดยเรียก
fit(x, y, ...)
แล้วdata
จะ tuple(x, y)
- หากคุณผ่านการ
tf.data.Dataset
โดยเรียกfit(dataset, ...)
แล้วdata
จะเป็นสิ่งที่ได้รับผลจากdataset
ในแต่ละชุด
ในร่างกายของ train_step
วิธีการที่เราดำเนินการปรับปรุงการฝึกอบรมปกติคล้ายกับสิ่งที่คุณมีอยู่แล้วคุ้นเคยกับ ที่สำคัญเราคำนวณการสูญเสียผ่านทาง self.compiled_loss
ซึ่งตัดขาดทุน (e) ฟังก์ชั่น (s) ที่ถูกส่งผ่านไปยัง compile()
ในทำนองเดียวกันเราเรียก self.compiled_metrics.update_state(y, y_pred)
เพื่ออัปเดตสถานะของตัวชี้วัดที่ถูกส่งผ่านไปใน compile()
และเราผลการค้นหาจาก self.metrics
ด้านท้ายที่จะเรียกค่าปัจจุบันของพวกเขา
class CustomModel(keras.Model):
def train_step(self, data):
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute the loss value
# (the loss function is configured in `compile()`)
loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update metrics (includes the metric that tracks the loss)
self.compiled_metrics.update_state(y, y_pred)
# Return a dict mapping metric names to current value
return {m.name: m.result() for m in self.metrics}
มาลองทำกัน:
import numpy as np
# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3 32/32 [==============================] - 1s 2ms/step - loss: 0.9909 - mae: 0.8601 Epoch 2/3 32/32 [==============================] - 0s 2ms/step - loss: 0.4363 - mae: 0.5345 Epoch 3/3 32/32 [==============================] - 0s 2ms/step - loss: 0.2906 - mae: 0.4311 <keras.callbacks.History at 0x7f5ad1ca1090>
ไปสู่ระดับล่าง
อย่างที่คุณก็สามารถข้ามผ่านฟังก์ชั่นการสูญเสียใน compile()
และแทนที่จะทำทุกอย่างด้วยตนเองใน train_step
ในทำนองเดียวกันสำหรับตัวชี้วัด
นี่เป็นตัวอย่างในระดับต่ำกว่าที่จะใช้ compile()
การกำหนดค่าการเพิ่มประสิทธิภาพ:
- เราเริ่มต้นด้วยการสร้าง
Metric
กรณีการติดตามและการสูญเสียคะแนนแม่ของเรา - เราใช้เอง
train_step()
ที่ปรับปรุงสถานะของตัวชี้วัดเหล่านี้ (โดยการเรียกupdate_state()
ที่พวกเขา) แล้วสอบถามพวกเขา (ผ่านresult()
) เพื่อกลับค่าเฉลี่ยปัจจุบันของพวกเขาที่จะแสดงโดยแถบความคืบหน้าและจะเป็น ผ่านไปยังการโทรกลับใด ๆ - โปรดทราบว่าเราจะต้องเรียก
reset_states()
ตัวชี้วัดของเราระหว่างแต่ละยุค! มิฉะนั้นการเรียกresult()
จะกลับมาเป็นค่าเฉลี่ยตั้งแต่จุดเริ่มต้นของการฝึกอบรมในขณะที่เรามักจะทำงานร่วมกับค่าเฉลี่ยต่อยุค โชคดีที่กรอบสามารถทำเช่นนั้นเรา: เพียงรายการตัวชี้วัดใด ๆ ที่คุณต้องการตั้งค่าในmetrics
คุณสมบัติของรูปแบบ รูปแบบจะเรียกreset_states()
บนวัตถุใด ๆ ไว้ที่นี่ที่จุดเริ่มต้นของแต่ละfit()
ยุคหรือที่จุดเริ่มต้นของการเรียกร้องให้มีevaluate()
loss_tracker = keras.metrics.Mean(name="loss")
mae_metric = keras.metrics.MeanAbsoluteError(name="mae")
class CustomModel(keras.Model):
def train_step(self, data):
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute our own loss
loss = keras.losses.mean_squared_error(y, y_pred)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Compute our own metrics
loss_tracker.update_state(loss)
mae_metric.update_state(y, y_pred)
return {"loss": loss_tracker.result(), "mae": mae_metric.result()}
@property
def metrics(self):
# We list our `Metric` objects here so that `reset_states()` can be
# called automatically at the start of each epoch
# or at the start of `evaluate()`.
# If you don't implement this property, you have to call
# `reset_states()` yourself at the time of your choosing.
return [loss_tracker, mae_metric]
# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
# We don't passs a loss or metrics here.
model.compile(optimizer="adam")
# Just use `fit` as usual -- you can use callbacks, etc.
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)
Epoch 1/5 32/32 [==============================] - 0s 1ms/step - loss: 1.5969 - mae: 1.1523 Epoch 2/5 32/32 [==============================] - 0s 1ms/step - loss: 0.7352 - mae: 0.7310 Epoch 3/5 32/32 [==============================] - 0s 1ms/step - loss: 0.3830 - mae: 0.4999 Epoch 4/5 32/32 [==============================] - 0s 1ms/step - loss: 0.2809 - mae: 0.4215 Epoch 5/5 32/32 [==============================] - 0s 1ms/step - loss: 0.2590 - mae: 0.4058 <keras.callbacks.History at 0x7f5ad1b62c50>
สนับสนุน sample_weight
& class_weight
คุณอาจสังเกตเห็นว่าตัวอย่างพื้นฐานแรกของเราไม่ได้กล่าวถึงการถ่วงน้ำหนักตัวอย่าง หากคุณต้องการที่จะสนับสนุน fit()
ข้อโต้แย้ง sample_weight
และ class_weight
คุณก็จะทำต่อไปนี้:
- แกะ
sample_weight
จากdata
การโต้แย้ง - ผ่านไป
compiled_loss
&compiled_metrics
(แน่นอนคุณยังสามารถเพียงแค่ใช้มันด้วยตนเองถ้าคุณไม่ต้องพึ่งพาcompile()
สำหรับความเสียหายและตัวชี้วัด) - แค่นั้นแหละ. นั่นคือรายการ
class CustomModel(keras.Model):
def train_step(self, data):
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
if len(data) == 3:
x, y, sample_weight = data
else:
sample_weight = None
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute the loss value.
# The loss function is configured in `compile()`.
loss = self.compiled_loss(
y,
y_pred,
sample_weight=sample_weight,
regularization_losses=self.losses,
)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update the metrics.
# Metrics are configured in `compile()`.
self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)
# Return a dict mapping metric names to current value.
# Note that it will include the loss (tracked in self.metrics).
return {m.name: m.result() for m in self.metrics}
# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# You can now use sample_weight argument
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1365 - mae: 0.4196 Epoch 2/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1285 - mae: 0.4068 Epoch 3/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1212 - mae: 0.3971 <keras.callbacks.History at 0x7f5ad1ba64d0>
ให้ขั้นตอนการประเมินของคุณเอง
เกิดอะไรขึ้นถ้าคุณต้องการที่จะทำเช่นเดียวกันสำหรับการโทรไป model.evaluate()
? แล้วคุณจะแทนที่ test_step
ในทางเดียวกันว่า นี่คือสิ่งที่ดูเหมือน:
class CustomModel(keras.Model):
def test_step(self, data):
# Unpack the data
x, y = data
# Compute predictions
y_pred = self(x, training=False)
# Updates the metrics tracking the loss
self.compiled_loss(y, y_pred, regularization_losses=self.losses)
# Update the metrics.
self.compiled_metrics.update_state(y, y_pred)
# Return a dict mapping metric names to current value.
# Note that it will include the loss (tracked in self.metrics).
return {m.name: m.result() for m in self.metrics}
# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])
# Evaluate with our custom test_step
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
32/32 [==============================] - 0s 1ms/step - loss: 2.7584 - mae: 1.5920 [2.758362054824829, 1.59201979637146]
สรุป: ตัวอย่าง GAN แบบ end-to-end
มาดูตัวอย่างแบบ end-to-end ที่ใช้ประโยชน์จากทุกสิ่งที่คุณเพิ่งเรียนรู้กัน
ลองพิจารณา:
- เครือข่ายตัวสร้างหมายถึงการสร้างภาพขนาด 28x28x1
- เครือข่าย discriminator หมายถึงการจำแนกรูปภาพขนาด 28x28x1 ออกเป็นสองประเภท ("ของปลอม" และ "ของจริง")
- เครื่องมือเพิ่มประสิทธิภาพหนึ่งรายการสำหรับแต่ละรายการ
- ฟังก์ชันการสูญเสียเพื่อฝึกผู้เลือกปฏิบัติ
from tensorflow.keras import layers
# Create the discriminator
discriminator = keras.Sequential(
[
keras.Input(shape=(28, 28, 1)),
layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.GlobalMaxPooling2D(),
layers.Dense(1),
],
name="discriminator",
)
# Create the generator
latent_dim = 128
generator = keras.Sequential(
[
keras.Input(shape=(latent_dim,)),
# We want to generate 128 coefficients to reshape into a 7x7x128 map
layers.Dense(7 * 7 * 128),
layers.LeakyReLU(alpha=0.2),
layers.Reshape((7, 7, 128)),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
],
name="generator",
)
นี่คือคุณลักษณะที่สมบูรณ์ระดับ GAN เอาชนะ compile()
จะใช้ลายเซ็นของตัวเองและการดำเนินการตามขั้นตอนวิธีการ GAN ทั้งหมดใน 17 เส้นใน train_step
:
class GAN(keras.Model):
def __init__(self, discriminator, generator, latent_dim):
super(GAN, self).__init__()
self.discriminator = discriminator
self.generator = generator
self.latent_dim = latent_dim
def compile(self, d_optimizer, g_optimizer, loss_fn):
super(GAN, self).compile()
self.d_optimizer = d_optimizer
self.g_optimizer = g_optimizer
self.loss_fn = loss_fn
def train_step(self, real_images):
if isinstance(real_images, tuple):
real_images = real_images[0]
# Sample random points in the latent space
batch_size = tf.shape(real_images)[0]
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
# Decode them to fake images
generated_images = self.generator(random_latent_vectors)
# Combine them with real images
combined_images = tf.concat([generated_images, real_images], axis=0)
# Assemble labels discriminating real from fake images
labels = tf.concat(
[tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
)
# Add random noise to the labels - important trick!
labels += 0.05 * tf.random.uniform(tf.shape(labels))
# Train the discriminator
with tf.GradientTape() as tape:
predictions = self.discriminator(combined_images)
d_loss = self.loss_fn(labels, predictions)
grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
self.d_optimizer.apply_gradients(
zip(grads, self.discriminator.trainable_weights)
)
# Sample random points in the latent space
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
# Assemble labels that say "all real images"
misleading_labels = tf.zeros((batch_size, 1))
# Train the generator (note that we should *not* update the weights
# of the discriminator)!
with tf.GradientTape() as tape:
predictions = self.discriminator(self.generator(random_latent_vectors))
g_loss = self.loss_fn(misleading_labels, predictions)
grads = tape.gradient(g_loss, self.generator.trainable_weights)
self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
return {"d_loss": d_loss, "g_loss": g_loss}
มาทดลองขับกัน:
# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)
gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)
# To limit the execution time, we only train on 100 batches. You can train on
# the entire dataset. You will need about 20 epochs to get nice results.
gan.fit(dataset.take(100), epochs=1)
100/100 [==============================] - 3s 11ms/step - d_loss: 0.4031 - g_loss: 0.9305 <keras.callbacks.History at 0x7f5ad1b37c50>
แนวคิดเบื้องหลังการเรียนรู้เชิงลึกนั้นเรียบง่าย แล้วทำไมการนำไปปฏิบัติจึงเจ็บปวด?