Keras और MultiWorkerMirroredStrategy के साथ कस्टम ट्रेनिंग लूप

TensorFlow.org पर देखें Google Colab में चलाएं GitHub पर स्रोत देखें नोटबुक डाउनलोड करें

अवलोकन

यह ट्यूटोरियल कस्टम ट्रेनिंग लूप एपीआई के साथ मल्टी-वर्कर ट्रेनिंग को प्रदर्शित करता है, जिसे मल्टीवर्कर मिररडस्ट्रेटी के माध्यम से वितरित किया जाता है, इसलिए सिंगल-वर्कर पर चलने के लिए डिज़ाइन किया गया केरस मॉडल न्यूनतम कोड परिवर्तन के साथ कई वर्कर्स पर मूल रूप से काम कर सकता है।

हम अपने मॉडल को प्रशिक्षित करने के लिए कस्टम प्रशिक्षण लूप का उपयोग कर रहे हैं क्योंकि वे हमें लचीलापन और प्रशिक्षण पर अधिक नियंत्रण प्रदान करते हैं। इसके अलावा, मॉडल और प्रशिक्षण लूप को डीबग करना आसान है। शुरुआत से प्रशिक्षण लूप लिखने में अधिक विस्तृत जानकारी उपलब्ध है।

यदि आप keras model.fit के साथ MultiWorkerMirroredStrategy का उपयोग करना चाहते हैं, तो इसके बजाय इस ट्यूटोरियल को देखें।

TensorFlow गाइड में वितरित प्रशिक्षण tf.distribute.Strategy API की गहरी समझ में रुचि रखने वालों के लिए TensorFlow का समर्थन करने वाली वितरण रणनीतियों के अवलोकन के लिए उपलब्ध है।

सेट अप

सबसे पहले, कुछ आवश्यक आयात।

import json
import os
import sys

TensorFlow आयात करने से पहले, पर्यावरण में कुछ बदलाव करें।

सभी GPU अक्षम करें। यह सभी एक ही GPU का उपयोग करने की कोशिश कर रहे श्रमिकों द्वारा होने वाली त्रुटियों को रोकता है। एक वास्तविक अनुप्रयोग के लिए प्रत्येक कार्यकर्ता एक अलग मशीन पर होगा।

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

TF_CONFIG परिवेश चर रीसेट करें, आप इसके बारे में बाद में और देखेंगे।

os.environ.pop('TF_CONFIG', None)

सुनिश्चित करें कि वर्तमान निर्देशिका अजगर के पथ पर है। यह नोटबुक को बाद में %%writefile द्वारा लिखी गई फाइलों को आयात करने की अनुमति देता है।

if '.' not in sys.path:
  sys.path.insert(0, '.')

अब TensorFlow आयात करें।

import tensorflow as tf

डेटासेट और मॉडल परिभाषा

इसके बाद एक साधारण मॉडल और डेटासेट सेटअप के साथ एक mnist.py फ़ाइल बनाएँ। इस ट्यूटोरियल में कार्यकर्ता-प्रक्रियाओं द्वारा इस पायथन फ़ाइल का उपयोग किया जाएगा:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

बहु-कार्यकर्ता विन्यास

आइए अब बहु-कार्यकर्ता प्रशिक्षण की दुनिया में प्रवेश करें। TensorFlow में, कई मशीनों पर प्रशिक्षण के लिए TF_CONFIG पर्यावरण चर की आवश्यकता होती है, जिनमें से प्रत्येक की संभवतः एक अलग भूमिका होती है। TF_CONFIG नीचे उपयोग किया गया, एक JSON स्ट्रिंग है जिसका उपयोग क्लस्टर के प्रत्येक कार्यकर्ता पर क्लस्टर कॉन्फ़िगरेशन को निर्दिष्ट करने के लिए किया जाता है। यह क्लस्टर_रेसोल्वर. cluster_resolver.TFConfigClusterResolver का उपयोग करते हुए किसी क्लस्टर को निर्दिष्ट करने के लिए डिफ़ॉल्ट विधि है, लेकिन distribute.cluster_resolver .cluster_resolver मॉड्यूल में अन्य विकल्प उपलब्ध हैं।

अपने क्लस्टर का वर्णन करें

यहाँ एक उदाहरण विन्यास है:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

यहाँ वही TF_CONFIG है जो JSON स्ट्रिंग के रूप में क्रमबद्ध है:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

TF_CONFIG के दो घटक हैं: cluster और task

  • cluster सभी श्रमिकों के लिए समान है और प्रशिक्षण क्लस्टर के बारे में जानकारी प्रदान करता है, जो कि विभिन्न प्रकार की नौकरियों जैसे कि worker से युक्त एक निर्देश है। MultiWorkerMirroredStrategy के साथ बहु-कार्यकर्ता प्रशिक्षण में, आमतौर पर एक worker होता है जो एक नियमित worker के अलावा थोड़ी अधिक जिम्मेदारी लेता है जैसे कि चेकपॉइंट को सहेजना और TensorBoard के लिए सारांश फ़ाइल लिखना। ऐसे कार्यकर्ता को chief कार्यकर्ता के रूप में संदर्भित किया जाता है, और यह प्रथागत है कि index 0 वाले worker को मुख्य worker के रूप में नियुक्त किया जाता है (वास्तव में इस तरह tf.distribute.Strategy .वितरण। रणनीति लागू की जाती है)।

  • task वर्तमान कार्य की जानकारी प्रदान करता है और प्रत्येक कार्यकर्ता पर भिन्न होता है। यह उस कार्यकर्ता के type और index को निर्दिष्ट करता है।

इस उदाहरण में, आप कार्य type को "worker" और कार्य index को 0 पर सेट करते हैं। यह मशीन प्रथम श्रमिक है और मुख्य कार्यकर्ता के रूप में नियुक्त की जाएगी और अन्य की तुलना में अधिक कार्य करेगी। ध्यान दें कि अन्य मशीनों को TF_CONFIG पर्यावरण चर सेट की भी आवश्यकता होगी, और इसमें समान cluster निर्देश होना चाहिए, लेकिन उन मशीनों की भूमिका के आधार पर अलग-अलग कार्य type या कार्य index होनी चाहिए।

उदाहरण के लिए, यह ट्यूटोरियल दिखाता है कि कैसे कोई localhost पर 2 श्रमिकों के साथ TF_CONFIG सेट कर सकता है। व्यवहार में, उपयोगकर्ता बाहरी आईपी पते/बंदरगाहों पर कई कर्मचारी बनाएंगे, और प्रत्येक कार्यकर्ता पर उचित रूप से TF_CONFIG सेट करेंगे।

इस उदाहरण में आप 2 श्रमिकों का उपयोग करेंगे, पहले कार्यकर्ता का TF_CONFIG ऊपर दिखाया गया है। दूसरे कार्यकर्ता के लिए आप सेट करेंगे tf_config['task']['index']=1

ऊपर, tf_config अजगर में सिर्फ एक स्थानीय चर है। प्रशिक्षण को कॉन्फ़िगर करने के लिए वास्तव में इसका उपयोग करने के लिए, इस शब्दकोश को JSON के रूप में क्रमबद्ध किया जाना चाहिए, और TF_CONFIG पर्यावरण चर में रखा जाना चाहिए।

नोटबुक में पर्यावरण चर और उपप्रक्रियाएं

उपप्रक्रियाओं को अपने माता-पिता से पर्यावरण चर विरासत में मिलते हैं। इसलिए यदि आप इस jupyter notebook प्रक्रिया में एक पर्यावरण चर सेट करते हैं:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

आप उप-प्रक्रियाओं से पर्यावरण चर का उपयोग कर सकते हैं:

echo ${GREETINGS}
Hello TensorFlow!

अगले भाग में, आप इसका उपयोग TF_CONFIG को वर्कर सबप्रोसेस में पास करने के लिए करेंगे। आप वास्तव में अपनी नौकरियों को इस तरह से कभी लॉन्च नहीं करेंगे, लेकिन यह इस ट्यूटोरियल के उद्देश्यों के लिए पर्याप्त है: एक न्यूनतम बहु-कार्यकर्ता उदाहरण प्रदर्शित करने के लिए।

MultiWorkerप्रतिबिंबित रणनीति

मॉडल को प्रशिक्षित करने के लिए, tf.distribute.MultiWorkerMirroredStrategy के एक उदाहरण का उपयोग करें, जो सभी श्रमिकों के लिए प्रत्येक डिवाइस पर मॉडल की परतों में सभी चर की प्रतियां बनाता है। इस रणनीति के बारे में tf. tf.distribute.Strategy मार्गदर्शिका में अधिक विवरण हैं।

strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

यह निर्दिष्ट करने के लिए कि आपके मॉडल का निर्माण करते समय एक रणनीति का उपयोग किया जाना चाहिए, tf.distribute.Strategy.scope का उपयोग करें। यह आपको इस रणनीति के लिए " क्रॉस-रेप्लिका संदर्भ " में डालता है, जिसका अर्थ है कि रणनीति को चर प्लेसमेंट जैसी चीजों के नियंत्रण में रखा गया है।

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

कर्मचारियों के बीच अपने डेटा को ऑटो-शेयर करें

बहु-कार्यकर्ता प्रशिक्षण में, डेटासेट शार्डिंग की आवश्यकता नहीं होती है, हालांकि यह आपको बिल्कुल-एक बार शब्दार्थ देता है जो अधिक प्रशिक्षण को अधिक प्रतिलिपि प्रस्तुत करने योग्य बनाता है, अर्थात कई श्रमिकों पर प्रशिक्षण एक कार्यकर्ता पर प्रशिक्षण के समान होना चाहिए। नोट: कुछ मामलों में प्रदर्शन प्रभावित हो सकता है।

देखें: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

कस्टम प्रशिक्षण लूप को परिभाषित करें और मॉडल को प्रशिक्षित करें

एक अनुकूलक निर्दिष्ट करें

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

tf.function के साथ एक प्रशिक्षण चरण को परिभाषित करें

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

चेकपॉइंट को सहेजना और बहाल करना

कस्टम ट्रेनिंग लूप में चेकपॉइंटिंग कार्यान्वयन के लिए उपयोगकर्ता को केरस कॉलबैक का उपयोग करने के बजाय इसे संभालने की आवश्यकता होती है। यह आपको पूरे मॉडल को सहेजे बिना मॉडल के वजन को बचाने और उन्हें पुनर्स्थापित करने की अनुमति देता है।

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

यहां, आप एक tf.train.Checkpoint जो मॉडल को ट्रैक करता है, जिसे एक tf.train.CheckpointManager द्वारा प्रबंधित किया जाता है ताकि केवल नवीनतम चेकपॉइंट संरक्षित रहे।

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

अब, जब आपको पुनर्स्थापित करने की आवश्यकता होती है, तो आप सुविधाजनक tf.train.latest_checkpoint फ़ंक्शन का उपयोग करके सहेजे गए नवीनतम चेकपॉइंट को पा सकते हैं।

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

चेकपॉइंट को पुनर्स्थापित करने के बाद, आप अपने कस्टम प्रशिक्षण लूप का प्रशिक्षण जारी रख सकते हैं।

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.849107, train_loss: 0.491886.
Epoch: 1, accuracy: 0.937835, train_loss: 0.197650.
Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.

कर्मचारियों पर पूर्ण कोड सेटअप

वास्तव में MultiWorkerMirroredStrategy के साथ चलने के लिए आपको कार्यकर्ता प्रक्रियाओं को चलाने और उन्हें TF_CONFIG पास करने की आवश्यकता होगी।

पहले लिखी गई mnist.py फ़ाइल की तरह, यहाँ main.py है जिसमें वही कोड है जो हमने पहले इस कोलाब में चरण दर चरण चला था, हम इसे केवल एक फ़ाइल में लिख रहे हैं ताकि प्रत्येक कर्मचारी इसे चलाए:

फ़ाइल: main.py

Writing main.py

ट्रेन और मूल्यांकन

वर्तमान निर्देशिका में अब दोनों पायथन फाइलें हैं:

ls *.py
main.py
mnist.py

तो TF_CONFIG को json-serialize करें और इसे पर्यावरण चर में जोड़ें:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

अब, आप एक कार्यकर्ता प्रक्रिया शुरू कर सकते हैं जो main.py चलाएगी और main.py का उपयोग TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

उपरोक्त आदेश के बारे में ध्यान देने योग्य कुछ बातें हैं:

  1. यह %%bash का उपयोग करता है जो कुछ बैश कमांड चलाने के लिए एक नोटबुक "मैजिक" है।
  2. यह पृष्ठभूमि में bash प्रक्रिया को चलाने के लिए --bg ध्वज का उपयोग करता है, क्योंकि यह कार्यकर्ता समाप्त नहीं होगा। यह शुरू होने से पहले सभी श्रमिकों की प्रतीक्षा करता है।

बैकग्राउंड वर्कर प्रक्रिया इस नोटबुक में आउटपुट प्रिंट नहीं करेगी, इसलिए &> इसके आउटपुट को एक फाइल पर रीडायरेक्ट करता है, ताकि आप देख सकें कि क्या हुआ।

इसलिए, प्रक्रिया शुरू होने के लिए कुछ सेकंड प्रतीक्षा करें:

import time
time.sleep(20)

अब देखें कि कार्यकर्ता के लॉगफाइल में अब तक क्या आउटपुट रहा है:

cat job_0.log
34 एल10एन-प्लेसहोल्डर
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

लॉग फ़ाइल की अंतिम पंक्ति कहनी चाहिए Started server with target: grpc://localhost:12345 । पहला कार्यकर्ता अब तैयार है, और अन्य सभी कार्यकर्ताओं के आगे बढ़ने के लिए तैयार होने की प्रतीक्षा कर रहा है।

इसलिए tf_config को अपडेट करें ताकि दूसरे कार्यकर्ता को पिक-अप करने की प्रक्रिया मिल सके:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

अब दूसरा कार्यकर्ता लॉन्च करें। यह प्रशिक्षण शुरू करेगा क्योंकि सभी कार्यकर्ता सक्रिय हैं (इसलिए इस प्रक्रिया को पृष्ठभूमि में रखने की कोई आवश्यकता नहीं है):

python main.py > /dev/null 2>&1

अब यदि आप पहले कार्यकर्ता द्वारा लिखे गए लॉग को दोबारा जांचते हैं तो आप देखेंगे कि उसने उस मॉडल के प्रशिक्षण में भाग लिया था:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.832589, train_loss: 0.531260.
Epoch: 1, accuracy: 0.936161, train_loss: 0.214774.
Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

गहराई से बहु कार्यकर्ता प्रशिक्षण

इस ट्यूटोरियल ने मल्टी-वर्कर सेटअप के Custom Training Loop वर्कफ़्लो का प्रदर्शन किया है। अन्य विषयों का विस्तृत विवरण बहु-कार्यकर्ता सेटअप के model.fit's guide में उपलब्ध है और सीटीएल पर लागू होता है।

यह सभी देखें

  1. TensorFlow गाइड में वितरित प्रशिक्षण उपलब्ध वितरण रणनीतियों का अवलोकन प्रदान करता है।
  2. आधिकारिक मॉडल , जिनमें से कई को कई वितरण रणनीतियों को चलाने के लिए कॉन्फ़िगर किया जा सकता है।
  3. गाइड में प्रदर्शन अनुभाग अन्य रणनीतियों और उपकरणों के बारे में जानकारी प्रदान करता है जिनका उपयोग आप अपने TensorFlow मॉडल के प्रदर्शन को अनुकूलित करने के लिए कर सकते हैं।