ดูบน TensorFlow.org | ทำงานใน Google Colab | ดูแหล่งที่มาบน GitHub | ดาวน์โหลดโน๊ตบุ๊ค |
ภาพรวม
บทช่วยสอนนี้สาธิตการฝึกอบรมผู้ปฏิบัติงานหลายคนด้วย API ลูปการฝึกอบรมแบบกำหนดเอง ซึ่งเผยแพร่ผ่าน MultiWorkerMirroredStrategy ดังนั้นโมเดล Keras ที่ออกแบบมาเพื่อทำงานบน ผู้ปฏิบัติงานคนเดียว สามารถทำงานกับผู้ปฏิบัติงานหลายคนได้อย่างราบรื่นโดยเปลี่ยนโค้ดเพียงเล็กน้อย
เรากำลังใช้ลูปการฝึกแบบกำหนดเองเพื่อฝึกโมเดลของเรา เนื่องจากช่วยให้มีความยืดหยุ่นและควบคุมการฝึกได้มากขึ้น ยิ่งไปกว่านั้น การดีบักโมเดลและลูปการฝึกทำได้ง่ายขึ้น ข้อมูลรายละเอียดเพิ่มเติมมีอยู่ใน การเขียนลูปการฝึกอบรมตั้งแต่เริ่มต้น
หากคุณกำลังมองหาวิธีใช้ MultiWorkerMirroredStrategy
กับ keras model.fit
ให้อ้างอิงกับบทช่วย สอน นี้แทน
คู่มือ การฝึกอบรมแบบกระจายใน TensorFlow มีให้สำหรับภาพรวมของกลยุทธ์การจัดจำหน่าย TensorFlow รองรับสำหรับผู้ที่สนใจในความเข้าใจที่ลึกซึ้งยิ่งขึ้นของ tf.distribute.Strategy
API
ติดตั้ง
ขั้นแรกให้นำเข้าที่จำเป็นบางอย่าง
import json
import os
import sys
ก่อนนำเข้า TensorFlow ให้ทำการเปลี่ยนแปลงบางอย่างกับสภาพแวดล้อม
ปิดการใช้งาน GPU ทั้งหมด ซึ่งจะป้องกันข้อผิดพลาดที่เกิดจากพนักงานทุกคนที่พยายามใช้ GPU เดียวกัน สำหรับการใช้งานจริง ผู้ปฏิบัติงานแต่ละคนจะอยู่บนเครื่องที่แตกต่างกัน
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
รีเซ็ตตัวแปรสภาพแวดล้อม TF_CONFIG
คุณจะเห็นข้อมูลเพิ่มเติมในภายหลัง
os.environ.pop('TF_CONFIG', None)
ตรวจสอบให้แน่ใจว่าไดเร็กทอรีปัจจุบันอยู่บนเส้นทางของหลาม ซึ่งช่วยให้สมุดบันทึกนำเข้าไฟล์ที่เขียนโดย %%writefile
ได้ในภายหลัง
if '.' not in sys.path:
sys.path.insert(0, '.')
ตอนนี้นำเข้า TensorFlow
import tensorflow as tf
นิยามชุดข้อมูลและโมเดล
ถัดไป สร้างไฟล์ mnist.py
ด้วยการตั้งค่าโมเดลและชุดข้อมูลอย่างง่าย ไฟล์ python นี้จะถูกใช้โดยกระบวนการของผู้ปฏิบัติงานในบทช่วยสอนนี้:
%%writefile mnist.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the range [0, 255].
# You need to convert them to float32 with values in the range [0, 1]
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000)
return train_dataset
def dataset_fn(global_batch_size, input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = mnist_dataset(batch_size)
dataset = dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
return dataset
def build_cnn_model():
return tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
Writing mnist.py
การกำหนดค่าหลายคน
มาเข้าสู่โลกของการฝึกอบรมผู้ปฏิบัติงานหลายคนกันเถอะ ใน TensorFlow ตัวแปรสภาพแวดล้อม TF_CONFIG
จำเป็นสำหรับการฝึกอบรมบนเครื่องหลายเครื่อง ซึ่งแต่ละเครื่องอาจมีบทบาทที่แตกต่างกัน TF_CONFIG
ที่ใช้ด้านล่าง เป็นสตริง JSON ที่ใช้เพื่อระบุการกำหนดค่าคลัสเตอร์ในผู้ปฏิบัติงานแต่ละคนที่เป็นส่วนหนึ่งของคลัสเตอร์ นี่เป็นวิธีดีฟอลต์สำหรับการระบุคลัสเตอร์ โดยใช้ cluster_resolver.TFConfigClusterResolver
แต่มีตัวเลือกอื่นๆ ที่พร้อมใช้งานในโมดูล distribute.cluster_resolver
อธิบายคลัสเตอร์ของคุณ
นี่คือตัวอย่างการกำหนดค่า:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
นี่คือ TF_CONFIG
เดียวกันที่จัดลำดับเป็นสตริง JSON:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
มีสององค์ประกอบของ TF_CONFIG
: cluster
และ task
cluster
จะเหมือนกันสำหรับผู้ปฏิบัติงานทุกคน และให้ข้อมูลเกี่ยวกับคลัสเตอร์การฝึกอบรม ซึ่งเป็น dict ที่ประกอบด้วยงานประเภทต่างๆ เช่นworker
ในการฝึกอบรมผู้ปฏิบัติงานหลายคนด้วยMultiWorkerMirroredStrategy
มักจะมีworker
หนึ่งที่รับผิดชอบมากกว่าเล็กน้อย เช่น การบันทึกจุดตรวจและการเขียนไฟล์สรุปสำหรับ TensorBoard นอกเหนือจากสิ่งที่worker
ทั่วไปทำ คนงานดังกล่าวเรียกว่าchief
คนงาน และเป็นเรื่องปกติที่ผู้worker
ที่มีindex
0 จะได้รับแต่งตั้งให้เป็นหัวหน้าworker
(อันที่จริง นี่คือวิธีการดำเนินการtf.distribute.Strategy
)task
ให้ข้อมูลของงานปัจจุบันและแตกต่างกันไปในแต่ละผู้ปฏิบัติงาน ระบุtype
และindex
ของผู้ปฏิบัติงานนั้น
ในตัวอย่างนี้ คุณตั้งค่า type
งานเป็น "worker"
และ index
งานเป็น 0
เครื่องนี้เป็นคนงานคนแรกและจะได้รับการแต่งตั้งเป็นหัวหน้าคนงานและทำงานมากกว่าเครื่องอื่น โปรดทราบว่าเครื่องอื่นจะต้องมีการตั้งค่าตัวแปรสภาพแวดล้อม TF_CONFIG
เช่นกัน และควรมี dict cluster
เดียวกัน แต่มี type
งานหรือ index
งานที่แตกต่างกัน ขึ้นอยู่กับบทบาทของเครื่องเหล่านั้น
เพื่อจุดประสงค์ในการอธิบายประกอบ บทช่วยสอนนี้แสดงวิธีที่บุคคลหนึ่งอาจตั้งค่า TF_CONFIG
กับ 2 คนทำงานบน localhost
ในทางปฏิบัติ ผู้ใช้จะสร้างผู้ปฏิบัติงานหลายคนบนที่อยู่/พอร์ต IP ภายนอก และตั้งค่า TF_CONFIG
กับผู้ปฏิบัติงานแต่ละคนอย่างเหมาะสม
ในตัวอย่างนี้ คุณจะใช้ผู้ปฏิบัติงาน 2 คน TF_CONFIG
ของผู้ปฏิบัติงานคนแรกแสดงไว้ด้านบน สำหรับผู้ปฏิบัติงานคนที่สอง คุณจะต้องตั้งค่า tf_config['task']['index']=1
ด้านบน tf_config
เป็นเพียงตัวแปรท้องถิ่นในไพ ธ อน หากต้องการใช้งานจริงเพื่อกำหนดค่าการฝึก พจนานุกรมนี้ต้องจัดลำดับเป็น JSON และวางไว้ในตัวแปรสภาพแวดล้อม TF_CONFIG
ตัวแปรสภาพแวดล้อมและกระบวนการย่อยในโน้ตบุ๊ก
กระบวนการย่อยรับช่วงตัวแปรสภาพแวดล้อมจากพาเรนต์ ดังนั้น หากคุณตั้งค่าตัวแปรสภาพแวดล้อมในกระบวนการ jupyter notebook
นี้:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
คุณสามารถเข้าถึงตัวแปรสภาพแวดล้อมได้จากกระบวนการย่อย:
echo ${GREETINGS}
Hello TensorFlow!
ในส่วนถัดไป คุณจะใช้สิ่งนี้เพื่อส่ง TF_CONFIG
ไปยังกระบวนการย่อยของผู้ปฏิบัติงาน คุณจะไม่มีวันเริ่มงานด้วยวิธีนี้จริงๆ แต่ก็เพียงพอสำหรับจุดประสงค์ของบทช่วยสอนนี้: เพื่อสาธิตตัวอย่างผู้ปฏิบัติงานหลายคนขั้นต่ำ
MultiWorkerMirroredStrategy
ในการฝึกโมเดล ให้ใช้อินสแตนซ์ของ tf.distribute.MultiWorkerMirroredStrategy
ซึ่งจะสร้างสำเนาของตัวแปรทั้งหมดในเลเยอร์ของโมเดลในแต่ละอุปกรณ์สำหรับผู้ปฏิบัติงานทั้งหมด คู่มือ tf.distribute.Strategy
มีรายละเอียดเพิ่มเติมเกี่ยวกับกลยุทธ์นี้
strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO 2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
ใช้ tf.distribute.Strategy.scope
เพื่อระบุว่าควรใช้กลยุทธ์ในการสร้างแบบจำลองของคุณ สิ่งนี้ทำให้คุณอยู่ใน " บริบทการจำลองแบบข้าม " สำหรับกลยุทธ์นี้ ซึ่งหมายความว่ากลยุทธ์นี้ควบคุมสิ่งต่างๆ เช่น ตำแหน่งตัวแปร
import mnist
with strategy.scope():
# Model building needs to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
แบ่งข้อมูลของคุณโดยอัตโนมัติระหว่างผู้ปฏิบัติงาน
ในการฝึกอบรมผู้ปฏิบัติงานหลายคน ไม่จำเป็นต้องมีการแบ่งกลุ่มข้อมูลชุดข้อมูล แต่จะให้ความหมายเพียงครั้งเดียว ซึ่งทำให้การฝึกอบรมสามารถทำซ้ำได้มากขึ้น กล่าวคือ การฝึกอบรมกับผู้ปฏิบัติงานหลายคนควรเหมือนกับการฝึกอบรมกับพนักงานคนเดียว หมายเหตุ: ประสิทธิภาพอาจได้รับผลกระทบในบางกรณี
ดู: distribute_datasets_from_function
per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
with strategy.scope():
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
กำหนด Custom Training Loop และ Train the model
ระบุเครื่องมือเพิ่มประสิทธิภาพ
with strategy.scope():
# The creation of optimizer and train_accuracy will need to be in
# `strategy.scope()` as well, since they create variables.
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
กำหนดขั้นตอนการฝึกด้วย tf.function
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
การบันทึกและฟื้นฟูจุดตรวจ
การใช้จุดตรวจสอบใน Custom Training Loop กำหนดให้ผู้ใช้จัดการแทนที่จะใช้การเรียกกลับ keras ช่วยให้คุณบันทึกน้ำหนักของโมเดลและกู้คืนได้โดยไม่ต้องบันทึกทั้งโมเดล
from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and "chief" not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
ที่นี่ คุณจะต้องสร้าง tf.train.Checkpoint
หนึ่งตัวที่ติดตามโมเดล ซึ่งจัดการโดย tf.train.CheckpointManager
เพื่อคงไว้เฉพาะจุดตรวจล่าสุดเท่านั้น
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
เมื่อคุณต้องการคืนค่า คุณสามารถค้นหาจุดตรวจล่าสุดที่บันทึกไว้โดยใช้ฟังก์ชัน tf.train.latest_checkpoint
ที่สะดวก
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
หลังจากกู้คืนจุดตรวจแล้ว คุณสามารถดำเนินการฝึกวนรอบการฝึกแบบกำหนดเองต่อไปได้
num_epochs = 3
num_steps_per_epoch = 70
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
# Once the `CheckpointManager` is set up, you're now ready to save, and remove
# the checkpoints non-chief workers saved.
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.849107, train_loss: 0.491886. Epoch: 1, accuracy: 0.937835, train_loss: 0.197650. Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.
การตั้งค่ารหัสแบบเต็มสำหรับผู้ปฏิบัติงาน
หากต้องการใช้งาน MultiWorkerMirroredStrategy
คุณจะต้องเรียกใช้กระบวนการของผู้ปฏิบัติงานและส่ง TF_CONFIG
ให้พวกเขา
เช่นเดียวกับไฟล์ mnist.py
ที่เขียนไว้ก่อนหน้านี้ นี่คือ main.py
ที่มีโค้ดเดียวกันกับที่เราได้ดำเนินการไปทีละขั้นตอนก่อนหน้านี้ใน colab นี้ เราเพียงแค่เขียนลงในไฟล์เพื่อให้พนักงานแต่ละคนสามารถเรียกใช้ได้:
ไฟล์: main.py
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist
from multiprocessing import util
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
num_epochs = 3
num_steps_per_epoch=70
# Checkpoint saving and restoring
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and 'chief' not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
# Define Strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id, cluster_spec = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id,
strategy.cluster_resolver.cluster_spec())
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
# Restoring the checkpoint
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
# Resume our CTL training
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
Writing main.py
ฝึกฝนและประเมินผล
ไดเร็กทอรีปัจจุบันมีทั้งไฟล์ Python:
ls *.py
main.py mnist.py
ดังนั้น json-serialize TF_CONFIG
และเพิ่มลงในตัวแปรสภาพแวดล้อม:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
ตอนนี้คุณสามารถเปิดกระบวนการของผู้ปฏิบัติงานที่จะเรียกใช้ main.py
และใช้ TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
มีบางสิ่งที่ควรทราบเกี่ยวกับคำสั่งข้างต้น:
- มันใช้
%%bash
ซึ่งเป็น "เวทย์มนตร์" ของโน้ตบุ๊ก เพื่อเรียกใช้คำสั่งทุบตี - มันใช้แฟ
--bg
เพื่อรันกระบวนการbash
ในพื้นหลัง เนื่องจากผู้ปฏิบัติงานนี้จะไม่ยุติการทำงาน มันรอคนงานทั้งหมดก่อนที่จะเริ่ม
กระบวนการทำงานเบื้องหลังจะไม่พิมพ์ผลลัพธ์ไปยังสมุดบันทึกนี้ ดังนั้น &>
จึงเปลี่ยนเส้นทางเอาต์พุตไปยังไฟล์ เพื่อให้คุณเห็นว่าเกิดอะไรขึ้น
ดังนั้น รอสักครู่เพื่อให้กระบวนการเริ่มต้นขึ้น:
import time
time.sleep(20)
ตอนนี้ดูสิ่งที่ส่งออกไปยังไฟล์บันทึกของผู้ปฏิบัติงาน:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
บรรทัดสุดท้ายของล็อกไฟล์ควรระบุว่า: Started server with target: grpc://localhost:12345
ตอนนี้ผู้ปฏิบัติงานคนแรกพร้อมแล้ว และกำลังรอผู้ปฏิบัติงานคนอื่นๆ ให้พร้อมดำเนินการต่อไป
ดังนั้นให้อัปเดต tf_config
เพื่อให้กระบวนการของผู้ปฏิบัติงานคนที่สองได้รับ:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
ตอนนี้เปิดตัวผู้ปฏิบัติงานที่สอง การดำเนินการนี้จะเริ่มการฝึกอบรมเนื่องจากพนักงานทุกคนทำงานอยู่ (ดังนั้นจึงไม่จำเป็นต้องดำเนินการตามขั้นตอน):
python main.py > /dev/null 2>&1
ตอนนี้ หากคุณตรวจสอบบันทึกที่เขียนโดยพนักงานคนแรกอีกครั้ง คุณจะเห็นว่าบันทึกนั้นเข้าร่วมในการฝึกโมเดลนั้น:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration 2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.832589, train_loss: 0.531260. Epoch: 1, accuracy: 0.936161, train_loss: 0.214774. Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
การฝึกอบรมพนักงานหลายคนในเชิงลึก
บทช่วยสอนนี้ได้สาธิตเวิร์กโฟลว์ Custom Training Loop
ของการตั้งค่าผู้ปฏิบัติงานหลายคน คำอธิบายโดยละเอียดของหัวข้ออื่นๆ มีอยู่ในคู่มือ model.fit's guide
เกี่ยวกับการตั้งค่าผู้ปฏิบัติงานหลายคนและใช้ได้กับ CTL
ดูสิ่งนี้ด้วย
- คู่มือ การฝึกอบรมแบบกระจายใน TensorFlow ให้ภาพรวมของกลยุทธ์การจัดจำหน่ายที่มีอยู่
- โมเดลอย่างเป็นทางการ ซึ่งหลายแบบสามารถกำหนดค่าให้รันกลยุทธ์การจัดจำหน่ายได้หลายแบบ
- ส่วนประสิทธิภาพ ในคู่มือนี้ให้ข้อมูลเกี่ยวกับกลยุทธ์และ เครื่องมือ อื่นๆ ที่คุณสามารถใช้เพื่อปรับประสิทธิภาพของโมเดล TensorFlow ของคุณให้เหมาะสม