TensorFlow.org-এ দেখুন | Google Colab-এ চালান | GitHub-এ উৎস দেখুন | নোটবুক ডাউনলোড করুন |
ওভারভিউ
এই টিউটোরিয়ালটি কাস্টম ট্রেনিং লুপ API সহ মাল্টি-ওয়ার্কার প্রশিক্ষণ প্রদর্শন করে, মাল্টিওয়ার্কার মিররড স্ট্র্যাটেজির মাধ্যমে বিতরণ করা হয়েছে, তাই একক-কর্মীর উপর চালানোর জন্য ডিজাইন করা কেরাস মডেলটি ন্যূনতম কোড পরিবর্তন সহ একাধিক কর্মীদের উপর নির্বিঘ্নে কাজ করতে পারে।
আমরা আমাদের মডেলকে প্রশিক্ষণ দেওয়ার জন্য কাস্টম প্রশিক্ষণ লুপ ব্যবহার করছি কারণ তারা আমাদের নমনীয়তা দেয় এবং প্রশিক্ষণের উপর আরও বেশি নিয়ন্ত্রণ দেয়। তাছাড়া, মডেল এবং প্রশিক্ষণ লুপ ডিবাগ করা সহজ। স্ক্র্যাচ থেকে একটি প্রশিক্ষণ লুপ লেখাতে আরও বিশদ তথ্য পাওয়া যায়।
আপনি যদি keras model.fit
এর সাথে MultiWorkerMirroredStrategy
কীভাবে ব্যবহার করবেন তা খুঁজছেন তবে পরিবর্তে এই টিউটোরিয়ালটি পড়ুন।
যারা tf.distribute.Strategy
APIs সম্পর্কে গভীরভাবে বুঝতে আগ্রহী তাদের জন্য TensorFlow সমর্থন করে এমন বিতরণ কৌশলগুলির একটি সংক্ষিপ্ত বিবরণের জন্য TensorFlow গাইডে বিতরণকৃত প্রশিক্ষণ উপলব্ধ।
সেটআপ
প্রথমত, কিছু প্রয়োজনীয় আমদানি।
import json
import os
import sys
TensorFlow আমদানি করার আগে, পরিবেশে কিছু পরিবর্তন করুন।
সমস্ত GPU নিষ্ক্রিয় করুন। এটি একই GPU ব্যবহার করার চেষ্টাকারী কর্মীদের দ্বারা সৃষ্ট ত্রুটিগুলিকে প্রতিরোধ করে৷ একটি বাস্তব প্রয়োগের জন্য প্রতিটি কর্মী একটি ভিন্ন মেশিনে থাকবে।
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
TF_CONFIG
এনভায়রনমেন্ট ভেরিয়েবল রিসেট করুন, আপনি এই সম্পর্কে পরে আরও দেখতে পাবেন।
os.environ.pop('TF_CONFIG', None)
নিশ্চিত করুন যে বর্তমান ডিরেক্টরিটি পাইথনের পথে রয়েছে। এটি %%writefile
দ্বারা লিখিত ফাইলগুলি আমদানি করার অনুমতি দেয়।
if '.' not in sys.path:
sys.path.insert(0, '.')
এখন TensorFlow আমদানি করুন।
import tensorflow as tf
ডেটাসেট এবং মডেল সংজ্ঞা
এরপর একটি সাধারণ মডেল এবং ডেটাসেট সেটআপ সহ একটি mnist.py
ফাইল তৈরি করুন। এই টিউটোরিয়ালটিতে কর্মী-প্রক্রিয়াগুলি দ্বারা এই পাইথন ফাইলটি ব্যবহার করা হবে:
%%writefile mnist.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the range [0, 255].
# You need to convert them to float32 with values in the range [0, 1]
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000)
return train_dataset
def dataset_fn(global_batch_size, input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = mnist_dataset(batch_size)
dataset = dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
return dataset
def build_cnn_model():
return tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
Writing mnist.py
মাল্টি-কর্মী কনফিগারেশন
এখন মাল্টি-কর্মী প্রশিক্ষণের জগতে প্রবেশ করা যাক। TensorFlow-এ, একাধিক মেশিনে প্রশিক্ষণের জন্য TF_CONFIG
এনভায়রনমেন্ট ভেরিয়েবল প্রয়োজন, যার প্রতিটিরই সম্ভবত আলাদা ভূমিকা রয়েছে। নীচে ব্যবহৃত TF_CONFIG
হল একটি JSON স্ট্রিং যা ক্লাস্টারের অংশ প্রতিটি কর্মীর উপর ক্লাস্টার কনফিগারেশন নির্দিষ্ট করতে ব্যবহৃত হয়। এটি একটি ক্লাস্টার নির্দিষ্ট করার জন্য ডিফল্ট পদ্ধতি, cluster_resolver.TFConfigClusterResolver
ব্যবহার করে, কিন্তু distribute.cluster_resolver
মডিউলে অন্যান্য বিকল্প উপলব্ধ রয়েছে।
আপনার ক্লাস্টার বর্ণনা করুন
এখানে একটি উদাহরণ কনফিগারেশন আছে:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
এখানে একটি JSON স্ট্রিং হিসাবে একই TF_CONFIG
সিরিয়াল করা হয়েছে:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
TF_CONFIG
এর দুটি উপাদান রয়েছে: cluster
এবং task
।
cluster
সকল কর্মীদের জন্য একই এবং প্রশিক্ষণ ক্লাস্টার সম্পর্কে তথ্য প্রদান করে, যেটি বিভিন্ন ধরণের কাজ যেমনworker
সমন্বিত একটি ডিক্ট।MultiWorkerMirroredStrategy
এর সাথে মাল্টি-ওয়ার্কার প্রশিক্ষণে, সাধারণত একজনworker
থাকে যে একজন নিয়মিতworker
যা করে তার পাশাপাশি টেনসরবোর্ডের জন্য চেকপয়েন্ট সংরক্ষণ এবং সারাংশ ফাইল লেখার মতো একটু বেশি দায়িত্ব নেয়। এই ধরনের একজন কর্মীকেchief
কর্মী হিসাবে উল্লেখ করা হয়, এবং এটি প্রথাগত যেindex
0 সহworker
প্রধানworker
হিসাবে নিযুক্ত করা হয় (আসলে এইভাবেtf.distribute.Strategy
প্রয়োগ করা হয়)।task
বর্তমান কাজের তথ্য প্রদান করে এবং প্রতিটি কর্মীর জন্য আলাদা। এটি সেই কর্মীরtype
এবংindex
নির্দিষ্ট করে।
এই উদাহরণে, আপনি টাস্ক type
"worker"
এবং টাস্ক index
0
এ সেট করুন। এই মেশিনটিই প্রথম কর্মী এবং প্রধান কর্মী হিসেবে নিযুক্ত হবেন এবং অন্যদের চেয়ে বেশি কাজ করবেন। মনে রাখবেন যে অন্যান্য মেশিনগুলিতেও TF_CONFIG
এনভায়রনমেন্ট ভেরিয়েবল সেট থাকতে হবে, এবং এটিতে একই cluster
ডিক্ট থাকা উচিত, কিন্তু সেই মেশিনগুলির ভূমিকা কী তার উপর নির্ভর করে ভিন্ন টাস্ক type
বা টাস্ক index
।
উদাহরণের উদ্দেশ্যে, এই টিউটোরিয়ালটি দেখায় কিভাবে একজন localhost
2 জন কর্মী নিয়ে একটি TF_CONFIG
সেট করতে পারে। অনুশীলনে, ব্যবহারকারীরা বাহ্যিক IP ঠিকানা/পোর্টে একাধিক কর্মী তৈরি করবে এবং প্রতিটি কর্মীকে যথাযথভাবে TF_CONFIG
সেট করবে।
এই উদাহরণে আপনি 2 জন কর্মী ব্যবহার করবেন, প্রথম কর্মীর TF_CONFIG
উপরে দেখানো হয়েছে। দ্বিতীয় কর্মীর জন্য আপনি tf_config['task']['index']=1
সেট করবেন
উপরে, tf_config
একটি স্থানীয় পরিবর্তনশীল। আসলে প্রশিক্ষণ কনফিগার করতে এটি ব্যবহার করার জন্য, এই অভিধানটিকে JSON হিসাবে সিরিয়াল করা দরকার এবং TF_CONFIG
এনভায়রনমেন্ট ভেরিয়েবলে স্থাপন করা দরকার।
নোটবুকগুলিতে পরিবেশের পরিবর্তনশীল এবং সাবপ্রসেস
সাবপ্রসেসগুলি তাদের পিতামাতার কাছ থেকে এনভায়রনমেন্ট ভেরিয়েবলের উত্তরাধিকারী হয়। সুতরাং আপনি যদি এই jupyter notebook
প্রক্রিয়াতে একটি পরিবেশ পরিবর্তনশীল সেট করেন:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
আপনি একটি সাবপ্রসেস থেকে পরিবেশ পরিবর্তনশীল অ্যাক্সেস করতে পারেন:
echo ${GREETINGS}
Hello TensorFlow!
পরবর্তী বিভাগে, আপনি TF_CONFIG
কর্মী সাবপ্রসেসে পাস করতে এটি ব্যবহার করবেন। আপনি সত্যিই এইভাবে আপনার কাজগুলি কখনই চালু করবেন না, তবে এই টিউটোরিয়ালের উদ্দেশ্যে এটি যথেষ্ট: একটি ন্যূনতম বহু-কর্মী উদাহরণ প্রদর্শন করার জন্য।
মাল্টিওয়ার্কার মিররড স্ট্র্যাটেজি
মডেলটি প্রশিক্ষণের জন্য, tf.distribute.MultiWorkerMirroredStrategy
এর একটি উদাহরণ ব্যবহার করুন, যা সমস্ত কর্মী জুড়ে প্রতিটি ডিভাইসে মডেলের স্তরগুলিতে সমস্ত ভেরিয়েবলের অনুলিপি তৈরি করে৷ tf.distribute.Strategy
এই কৌশল সম্পর্কে আরও বিশদ রয়েছে।
strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO 2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
আপনার মডেল তৈরি করার সময় একটি কৌশল ব্যবহার করা উচিত তা নির্দিষ্ট করতে tf.distribute.Strategy.scope
ব্যবহার করুন। এটি আপনাকে এই কৌশলটির জন্য " ক্রস-প্রতিলিপি প্রসঙ্গে " রাখে, যার অর্থ কৌশলটি পরিবর্তনশীল স্থান নির্ধারণের মতো জিনিসগুলির নিয়ন্ত্রণে রাখা হয়।
import mnist
with strategy.scope():
# Model building needs to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
কর্মীদের জুড়ে আপনার ডেটা স্বয়ংক্রিয়ভাবে ভাগ করুন
বহু-কর্মী প্রশিক্ষণে, ডেটাসেট শার্ডিংয়ের প্রয়োজন হয় না, তবে এটি আপনাকে ঠিক-একবার শব্দার্থবিদ্যা দেয় যা আরও প্রশিক্ষণকে আরও পুনরুত্পাদনযোগ্য করে তোলে, অর্থাৎ একাধিক কর্মীদের প্রশিক্ষণ একজন শ্রমিকের প্রশিক্ষণের মতোই হওয়া উচিত। দ্রষ্টব্য: কিছু ক্ষেত্রে কর্মক্ষমতা প্রভাবিত হতে পারে।
দেখুন: distribute_datasets_from_function
per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
with strategy.scope():
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
কাস্টম ট্রেনিং লুপ সংজ্ঞায়িত করুন এবং মডেলটিকে প্রশিক্ষণ দিন
একটি অপ্টিমাইজার নির্দিষ্ট করুন
with strategy.scope():
# The creation of optimizer and train_accuracy will need to be in
# `strategy.scope()` as well, since they create variables.
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
tf.function
সহ একটি প্রশিক্ষণ ধাপ সংজ্ঞায়িত করুন
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
চেকপয়েন্ট সংরক্ষণ এবং পুনরুদ্ধার
একটি কাস্টম ট্রেনিং লুপে চেকপয়েন্টিং বাস্তবায়নের জন্য ব্যবহারকারীকে কেরাস কলব্যাক ব্যবহার করার পরিবর্তে এটি পরিচালনা করতে হবে। এটি আপনাকে মডেলের ওজন সংরক্ষণ করতে এবং পুরো মডেলটি সংরক্ষণ না করেই সেগুলি পুনরুদ্ধার করতে দেয়।
from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and "chief" not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
এখানে, আপনি একটি tf.train.Checkpoint
তৈরি করবেন যা মডেলটিকে ট্র্যাক করে, যেটি একটি tf.train.CheckpointManager
দ্বারা পরিচালিত হয় যাতে শুধুমাত্র সর্বশেষ চেকপয়েন্টটি সংরক্ষিত থাকে৷
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
এখন, যখন আপনাকে পুনরুদ্ধার করতে হবে, আপনি সুবিধাজনক tf.train.latest_checkpoint
ফাংশন ব্যবহার করে সংরক্ষিত সর্বশেষ চেকপয়েন্টটি খুঁজে পেতে পারেন।
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
চেকপয়েন্ট পুনরুদ্ধার করার পরে, আপনি আপনার কাস্টম প্রশিক্ষণ লুপ প্রশিক্ষণ চালিয়ে যেতে পারেন।
num_epochs = 3
num_steps_per_epoch = 70
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
# Once the `CheckpointManager` is set up, you're now ready to save, and remove
# the checkpoints non-chief workers saved.
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.849107, train_loss: 0.491886. Epoch: 1, accuracy: 0.937835, train_loss: 0.197650. Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.
কর্মীদের উপর সম্পূর্ণ কোড সেটআপ
আসলে MultiWorkerMirroredStrategy
সাথে চালানোর জন্য আপনাকে কর্মী প্রক্রিয়া চালাতে হবে এবং তাদের কাছে একটি TF_CONFIG
হবে।
আগে লেখা mnist.py
ফাইলের মতো, এখানে main.py
একই কোড রয়েছে যা আমরা এই কোল্যাবে আগে ধাপে ধাপে দেখেছি, আমরা এটিকে একটি ফাইলে লিখছি যাতে প্রতিটি কর্মী এটি চালাতে পারে:
ফাইল: main.py
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist
from multiprocessing import util
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
num_epochs = 3
num_steps_per_epoch=70
# Checkpoint saving and restoring
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and 'chief' not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
# Define Strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id, cluster_spec = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id,
strategy.cluster_resolver.cluster_spec())
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
# Restoring the checkpoint
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
# Resume our CTL training
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
Writing main.py
ট্রেন এবং মূল্যায়ন
বর্তমান ডিরেক্টরিতে এখন উভয় পাইথন ফাইল রয়েছে:
ls *.py
main.py mnist.py
তাই TF_CONFIG-কে TF_CONFIG
ক্রমিক করুন এবং পরিবেশের ভেরিয়েবলে যোগ করুন:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
এখন, আপনি একটি কর্মী প্রক্রিয়া চালু করতে পারেন যা TF_CONFIG
চালাবে এবং main.py
ব্যবহার করবে:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
উপরের কমান্ড সম্পর্কে নোট করার জন্য কয়েকটি জিনিস রয়েছে:
- এটি
%%bash
ব্যবহার করে যা কিছু ব্যাশ কমান্ড চালানোর জন্য একটি নোটবুক "জাদু" । - এটি ব্যাকগ্রাউন্ডে
bash
প্রক্রিয়া চালানোর জন্য--bg
পতাকা ব্যবহার করে, কারণ এই কর্মী শেষ হবে না। এটি শুরু হওয়ার আগে সমস্ত কর্মীদের জন্য অপেক্ষা করে।
ব্যাকগ্রাউন্ডেড ওয়ার্কার প্রসেস এই নোটবুকে আউটপুট প্রিন্ট করবে না, তাই &>
তার আউটপুটকে একটি ফাইলে রিডাইরেক্ট করে, যাতে আপনি দেখতে পারেন কি হয়েছে।
সুতরাং, প্রক্রিয়াটি শুরু হওয়ার জন্য কয়েক সেকেন্ড অপেক্ষা করুন:
import time
time.sleep(20)
এখন পর্যন্ত কর্মীর লগফাইলে কী আউটপুট হয়েছে তা দেখুন:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
লগ ফাইলের শেষ লাইনে বলা উচিত: Started server with target: grpc://localhost:12345
। প্রথম কর্মী এখন প্রস্তুত, এবং অন্য সমস্ত কর্মী(গুলি) এগিয়ে যাওয়ার জন্য প্রস্তুত হওয়ার জন্য অপেক্ষা করছে৷
সুতরাং দ্বিতীয় কর্মীর প্রক্রিয়া বাছাই করার জন্য tf_config
আপডেট করুন:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
এখন দ্বিতীয় কর্মী চালু করুন। এটি প্রশিক্ষণ শুরু করবে যেহেতু সমস্ত কর্মী সক্রিয় রয়েছে (তাই এই প্রক্রিয়াটির ব্যাকগ্রাউন্ড করার দরকার নেই):
python main.py > /dev/null 2>&1
এখন আপনি যদি প্রথম কর্মী দ্বারা লিখিত লগগুলি পুনরায় পরীক্ষা করেন তবে আপনি দেখতে পাবেন যে এটি সেই মডেলের প্রশিক্ষণে অংশগ্রহণ করেছে:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration 2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.832589, train_loss: 0.531260. Epoch: 1, accuracy: 0.936161, train_loss: 0.214774. Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
গভীরভাবে বহু কর্মী প্রশিক্ষণ
এই টিউটোরিয়ালটি মাল্টি-ওয়ার্কার সেটআপের একটি Custom Training Loop
ওয়ার্কফ্লো প্রদর্শন করেছে। মাল্টি-ওয়ার্কার সেটআপের model.fit's guide
ক্ষেত্রে প্রযোজ্য।
আরো দেখুন
- TensorFlow গাইডে বিতরণকৃত প্রশিক্ষণ উপলব্ধ বিতরণ কৌশলগুলির একটি ওভারভিউ প্রদান করে।
- অফিসিয়াল মডেল , যার মধ্যে অনেকগুলি একাধিক বিতরণ কৌশল চালানোর জন্য কনফিগার করা যেতে পারে।
- গাইডের পারফরম্যান্স বিভাগটি আপনার টেনসরফ্লো মডেলের পারফরম্যান্স অপ্টিমাইজ করতে আপনি ব্যবহার করতে পারেন এমন অন্যান্য কৌশল এবং সরঞ্জাম সম্পর্কে তথ্য সরবরাহ করে।