عرض على TensorFlow.org | تشغيل في Google Colab | عرض المصدر على جيثب | تحميل دفتر |
ملخص
يعد تدريب خادم المعلمات طريقة موازية للبيانات الشائعة لتوسيع نطاق تدريب النموذج على أجهزة متعددة.
تتكون مجموعة تدريب خادم المعلمات من العمال وخوادم المعلمات . يتم إنشاء المتغيرات على خوادم المعلمات ويتم قراءتها وتحديثها من قبل العاملين في كل خطوة. بشكل افتراضي ، يقوم العمال بقراءة هذه المتغيرات وتحديثها بشكل مستقل دون المزامنة مع بعضهم البعض. هذا هو السبب في أن التدريب على نمط خادم المعامل يسمى أحيانًا التدريب غير المتزامن .
في TensorFlow 2 ، يتم تشغيل تدريب خادم المعلمات بواسطة فئة tf.distribute.experimental.ParameterServerStrategy
، التي توزع خطوات التدريب على نظام مجموعة يصل إلى آلاف العمال (مصحوبًا بخوادم المعلمات).
طرق التدريب المدعومة
هناك طريقتان رئيسيتان للتدريب:
- واجهة برمجة تطبيقات
Model.fit
، والتي يوصى بها عندما تفضل التجريد عالي المستوى والتعامل مع التدريب. - حلقة تدريب مخصصة (يمكنك الرجوع إلى تدريب مخصص ، وكتابة حلقة تدريب من البداية وحلقة تدريب مخصصة مع Keras و MultiWorkerMirroredStrategy لمزيد من التفاصيل.) يوصى بتدريب الحلقة المخصصة عندما تفضل تحديد تفاصيل حلقة التدريب الخاصة بهم.
كتلة بالوظائف والمهام
بغض النظر عن واجهة برمجة التطبيقات التي تختارها ( Model.fit
أو حلقة تدريب مخصصة) ، يتضمن التدريب الموزع في TensorFlow 2: 'cluster'
بها 'jobs'
متعددة ، وقد يكون لكل وظيفة 'tasks'
واحدة أو أكثر.
عند استخدام تدريب خادم المعلمات ، يوصى بالحصول على:
- وظيفة منسق واحدة (لها اسم وظيفي
chief
) - وظائف متعددة للعمال (اسم الوظيفة
worker
) ؛ و - وظائف خادم متعدد المعلمات (اسم الوظيفة
ps
)
بينما ينشئ المنسق الموارد ، ويرسل مهام التدريب ، ويكتب نقاط التفتيش ، ويتعامل مع حالات فشل المهام ، يقوم العاملون وخوادم المعلمات بتشغيل tf.distribute.Server
الذي يستمع إلى الطلبات من المنسق.
تدريب خادم المعلمات مع Model.fit
API
يتطلب تدريب خادم المعلمات باستخدام Model.fit
API أن يستخدم المنسق كائن tf.distribute.experimental.ParameterServerStrategy
و tf.keras.utils.experimental.DatasetCreator
كمدخل. على غرار استخدام Model.fit
بدون إستراتيجية ، أو مع إستراتيجيات أخرى ، يتضمن سير العمل إنشاء النموذج وتجميعه ، وإعداد عمليات الاسترجاعات ، متبوعة باستدعاء Model.fit
.
تدريب خادم المعلمات مع حلقة تدريب مخصصة
مع حلقات التدريب المخصصة ، فإن فئة tf.distribute.experimental.coordinator.ClusterCoordinator
هي المكون الرئيسي المستخدم للمنسق.
- تحتاج فئة
ClusterCoordinator
إلى العمل مع كائنtf.distribute.Strategy
. - هذا الكائن
tf.distribute.Strategy
لتوفير معلومات الكتلة ويستخدم لتحديد خطوة التدريب ، كما هو موضح في التدريب المخصص باستخدام tf.distribute.Strategy . - يرسل كائن
ClusterCoordinator
بعد ذلك تنفيذ خطوات التدريب هذه إلى العاملين عن بُعد. - لتدريب خادم المعلمات ، يحتاج
ClusterCoordinator
إلى العمل معtf.distribute.experimental.ParameterServerStrategy
.
أهم واجهة برمجة تطبيقات يوفرها كائن ClusterCoordinator
هي schedule
:
- تقوم واجهة برمجة التطبيقات (API)
schedule
بإدراج دالة tf في قائمة الانتظار وإرجاع قيمةtf.function
شبيهةRemoteValue
على الفور. - سيتم إرسال وظائف قائمة الانتظار إلى العاملين عن بعد في مؤشرات الترابط في الخلفية وسيتم ملء
RemoteValue
الخاصة بهم بشكل غير متزامن. - نظرًا لأن
schedule
لا يتطلب تعيين العامل ، يمكن تنفيذtf.function
التي تم تمريرها على أي عامل متاح. - إذا أصبح العامل الذي تم تنفيذه عليه غير متاح قبل اكتماله ، فستتم إعادة محاولة الوظيفة على عامل آخر متاح.
- بسبب هذه الحقيقة وحقيقة أن تنفيذ الوظيفة ليس ذريًا ، يمكن تنفيذ الوظيفة أكثر من مرة.
بالإضافة إلى إرسال الوظائف البعيدة ، يساعد ClusterCoordinator
أيضًا في إنشاء مجموعات بيانات عن جميع العمال وإعادة بناء مجموعات البيانات هذه عندما يتعافى العامل من الفشل.
إعداد البرنامج التعليمي
سيتفرع البرنامج التعليمي إلى Model.fit
ومسارات حلقة تدريب مخصصة ، ويمكنك اختيار المسار الذي يناسب احتياجاتك. تنطبق الأقسام الأخرى بخلاف "التدريب مع X" على كلا المسارين.
pip install portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
إعداد الكتلة
كما هو مذكور أعلاه ، تتطلب مجموعة تدريب خادم المعلمات مهمة منسق تقوم بتشغيل برنامج التدريب الخاص بك ، واحد أو أكثر من العمال ومهام خادم المعلمات التي تقوم بتشغيل خوادم TensorFlow - tf.distribute.Server
- وربما مهمة تقييم إضافية تقوم بإجراء تقييم جانبي للسيارة (انظر قسم تقييم السيارة الجانبية أدناه). متطلبات إعدادها هي:
- تحتاج مهمة المنسق إلى معرفة عناوين ومنافذ جميع خوادم TensorFlow الأخرى باستثناء المقيم.
- يحتاج العمال وخوادم المعلمات إلى معرفة المنفذ الذي يحتاجون إليه للاستماع إليه. من أجل البساطة ، يمكنك عادةً تمرير معلومات المجموعة الكاملة عند إنشاء خوادم TensorFlow في هذه المهام.
- ليس من الضروري أن تعرف مهمة المقيم تكوين مجموعة التدريب. إذا حدث ذلك ، فلا يجب محاولة الاتصال بمجموعة التدريب.
- يجب أن يكون للعمال وخوادم المعلمات أنواع مهام مثل
"worker"
و"ps"
، على التوالي. يجب أن يستخدم المنسق"chief"
كنوع المهمة لأسباب قديمة.
في هذا البرنامج التعليمي ، ستقوم بإنشاء مجموعة قيد التشغيل بحيث يمكن تشغيل تدريب خادم المعلمات بالكامل في Colab. سوف تتعلم كيفية إنشاء مجموعات حقيقية في قسم لاحق.
الكتلة قيد التشغيل
ستبدأ بإنشاء عدة خوادم TensorFlow مسبقًا والاتصال بها لاحقًا. لاحظ أن هذا فقط لغرض العرض التوضيحي لهذا البرنامج التعليمي ، وفي التدريب الحقيقي ، سيتم تشغيل الخوادم على أجهزة "worker"
و "ps"
.
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name="worker",
task_index=i,
config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec,
job_name="ps",
task_index=i,
protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
كثيرًا ما يتم استخدام إعداد الكتلة قيد المعالجة في اختبار الوحدة ، كما هو الحال هنا .
هناك خيار آخر للاختبار المحلي وهو بدء العمليات على الجهاز المحلي — تحقق من تدريب العمال المتعددين باستخدام Keras للحصول على مثال على هذا النهج.
إنشاء إستراتيجية ParameterServer
قبل الغوص في رمز التدريب ، دعنا ننشئ مثيلًا لكائن ParameterServerStrategy
. لاحظ أن هذا ضروري بغض النظر عما إذا كنت Model.fit
أو حلقة تدريب مخصصة. سيتم شرح الوسيطة variable_partitioner
في قسم التجزئة المتغير .
variable_partitioner = (
tf.distribute.experimental.partitioners.MinSizePartitioner(
min_shard_bytes=(256 << 10),
max_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
من أجل استخدام وحدات معالجة الرسومات للتدريب ، قم بتخصيص وحدات معالجة الرسومات المرئية لكل عامل. ستستخدم ParameterServerStrategy
جميع وحدات معالجة الرسومات المتاحة على كل عامل ، مع تقييد أن جميع العمال يجب أن يكون لديهم نفس عدد وحدات معالجة الرسومات المتاحة.
التجزئة المتغيرة
يشير مصطلح التجزئة المتغيرة إلى تقسيم المتغير إلى عدة متغيرات أصغر ، والتي تسمى الأجزاء . قد تكون التجزئة المتغيرة مفيدة لتوزيع حمل الشبكة عند الوصول إلى هذه الأجزاء. من المفيد أيضًا توزيع حساب وتخزين متغير عادي عبر خوادم متعددة المعلمات.
لتمكين التجزئة المتغيرة ، يمكنك المرور في variable_partitioner
عند إنشاء كائن ParameterServerStrategy
. سيتم استدعاء variable_partitioner
في كل مرة يتم فيها إنشاء متغير ومن المتوقع أن يُرجع عدد الأجزاء على طول كل بُعد من أبعاد المتغير. يتم توفير بعض variable_partitioner
الجاهزة مثل tf.distribute.experimental.partitioners.MinSizePartitioner
. يوصى باستخدام مقسمات على أساس الحجم مثل tf.distribute.experimental.partitioners.MinSizePartitioner
لتجنب تقسيم المتغيرات الصغيرة ، والتي يمكن أن يكون لها تأثير سلبي على سرعة تدريب النموذج.
عندما يتم تمرير قسم variable_partitioner
إلى الداخل وإذا قمت بإنشاء متغير مباشرة ضمن strategy.scope()
، فسيصبح نوع حاوية بخاصية variables
التي توفر الوصول إلى قائمة الأجزاء. في معظم الحالات ، سيتم تحويل هذه الحاوية تلقائيًا إلى Tensor من خلال تسلسل جميع القطع. نتيجة لذلك ، يمكن استخدامه كمتغير عادي. من ناحية أخرى ، توفر بعض طرق TensorFlow مثل tf.nn.embedding_lookup
تنفيذًا فعالاً لنوع الحاوية هذا وفي هذه الطرق سيتم تجنب التسلسل التلقائي.
يرجى الاطلاع على مستندات API الخاصة بـ tf.distribute.experimental.ParameterServerStrategy
لمزيد من التفاصيل.
التدريب مع Model.fit
يوفر Keras واجهة برمجة تطبيقات تدريب سهلة الاستخدام عبر Model.fit
التي تتعامل مع حلقة التدريب تحت الغطاء ، مع مرونة خطوات train_step
القابلة للتجاوز وعمليات رد الاتصال ، والتي توفر وظائف مثل حفظ نقاط التفتيش أو الحفظ الموجز للوحة TensorBoard. باستخدام Model.fit
، يمكن استخدام نفس رمز التدريب لاستراتيجيات أخرى بتبديل بسيط لكائن الإستراتيجية.
ادخال البيانات
Model.fit
مع تدريب خادم المعلمات أن يتم توفير بيانات الإدخال في ملف قابل للاستدعاء يأخذ وسيطة واحدة من النوع tf.distribute.InputContext
، ويعيد tf.data.Dataset
. بعد ذلك ، أنشئ كائنًا tf.keras.utils.experimental.DatasetCreator
يأخذ مثل هذا tf.distribute.InputOptions
callable
عبر وسيطة input_options
.
لاحظ أنه يوصى بتبديل البيانات وتكرارها باستخدام تدريب خادم المعلمات ، وتحديد steps_per_epoch
في fit
call حتى تعرف المكتبة حدود الفترة.
الرجاء مراجعة البرنامج التعليمي للإدخال الموزع لمزيد من المعلومات حول وسيطة InputContext
.
def dataset_fn(input_context):
global_batch_size = 64
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.shard(
input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2)
return dataset
dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
سيتم استدعاء الكود الموجود في dataset_fn
على جهاز الإدخال ، والذي يكون عادةً وحدة المعالجة المركزية ، على كل جهاز من الأجهزة العاملة.
بناء النموذج وتجميعه
الآن ، ستقوم بإنشاء tf.keras.Model
- نموذج تافه tf.keras.models.Sequential
لأغراض العرض التوضيحي - متبوعًا باستدعاء Model.compile
لدمج المكونات ، مثل مُحسِّن أو مقاييس أو معلمات مثل steps_per_execution
:
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)
عمليات الاسترجاعات والتدريب
قبل استدعاء model.fit
للتدريب الفعلي ، دعنا نجهز عمليات الاسترجاعات المطلوبة للمهام الشائعة ، مثل:
-
ModelCheckpoint
: لحفظ أوزان النموذج. -
BackupAndRestore
: للتأكد من أن تقدم التدريب يتم نسخه احتياطيًا تلقائيًا ، واستعادته إذا واجهت المجموعة عدم توفر (مثل الإحباط أو الاستباق) ؛ أو -
TensorBoard
: لحفظ تقارير التقدم في ملفات موجزة ، والتي يتم تصورها في أداة TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=log_dir),
tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step Epoch 2/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step Epoch 3/5 WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. 20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step Epoch 4/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step Epoch 5/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step <keras.callbacks.History at 0x7f89984ca890>
الاستخدام المباشر مع ClusterCoordinator
(اختياري)
حتى إذا اخترت مسار التدريب Model.fit
، يمكنك اختياريًا إنشاء مثيل لكائن tf.distribute.experimental.coordinator.ClusterCoordinator
لجدولة الوظائف الأخرى التي ترغب في تنفيذها على العمال. راجع قسم التدريب مع حلقة تدريب مخصصة لمزيد من التفاصيل والأمثلة.
التدريب بحلقة تدريب مخصصة
يوفر استخدام حلقات تدريب مخصصة مع tf.distribute.Strategy
مرونة كبيرة في تحديد حلقات التدريب. باستخدام ParameterServerStrategy
المحددة أعلاه ( strategy
) ، ستستخدم tf.distribute.experimental.coordinator.ClusterCoordinator
لإرسال تنفيذ خطوات التدريب إلى العمال عن بعد.
بعد ذلك ، ستقوم بإنشاء نموذج ، وتحديد مجموعة بيانات ووظيفة خطوة ، كما فعلت في حلقة التدريب مع tf.distribute.Strategy
s. يمكنك العثور على مزيد من التفاصيل في البرنامج التعليمي المخصص باستخدام tf.distribute.Strategy .
لضمان الجلب المسبق الفعال لمجموعة البيانات ، استخدم واجهات برمجة تطبيقات إنشاء مجموعة البيانات الموزعة الموصى بها والمذكورة في خطوات تدريب الإرسال إلى قسم العاملين عن بُعد أدناه. تأكد أيضًا من استدعاء Strategy.run
داخل worker_fn
للاستفادة الكاملة من وحدات معالجة الرسومات المخصصة للعاملين. باقي الخطوات هي نفسها بالنسبة للتدريب مع أو بدون وحدات معالجة الرسومات.
لنقم بإنشاء هذه المكونات في الخطوات التالية:
قم بإعداد البيانات
أولاً ، اكتب دالة تُنشئ مجموعة بيانات تتضمن منطق المعالجة المسبقة الذي تنفذه طبقات Keras المسبقة .
ستنشئ هذه الطبقات خارج dataset_fn
ولكن ستطبق التحويل داخل dataset_fn
، حيث dataset_fn
مجموعة البيانات_fn إلى tf.function
، والتي لا تسمح بإنشاء متغيرات بداخلها.
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=feature_vocab,
mask_token=None)
label_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = tf.keras.layers.Input(
shape=(3,),
dtype=tf.string,
name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = tf.keras.Model(
{"features": raw_feature_input},
feature_id_input)
raw_label_input = tf.keras.layers.Input(
shape=(1,),
dtype=tf.string,
name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = tf.keras.Model(
{"label": raw_label_input},
label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison return bool(asarray(a1 == a2).all())
أنشئ أمثلة على لعبة في مجموعة بيانات:
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
بعد ذلك ، أنشئ مجموعة بيانات التدريب ملفوفة في dataset_fn
:
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
بناء النموذج
بعد ذلك ، قم بإنشاء النموذج والكائنات الأخرى. تأكد من إنشاء جميع المتغيرات ضمن strategy.scope
.
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with Keras processing layers.
model_input = tf.keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = tf.keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
model = tf.keras.Model({"features": model_input}, dense_output)
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
accuracy = tf.keras.metrics.Accuracy()
دعنا نؤكد أن استخدام FixedShardsPartitioner
قسّم جميع المتغيرات إلى جزأين وتم تعيين كل جزء لخوادم معلمات مختلفة:
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"
حدد خطوة التدريب
ثالثًا ، قم بإنشاء خطوة التدريب ملفوفة في tf.function
:
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
في وظيفة خطوة التدريب أعلاه ، يمكن أن يدعم استدعاء Strategy.run
و Strategy.reduce
في step_fn
عدة وحدات معالجة رسومات لكل عامل. إذا تم تخصيص وحدات معالجة الرسومات للعمال ، فسوف تقوم Strategy.run
بتوزيع مجموعات البيانات على عدة نسخ متماثلة.
إرسال خطوات التدريب إلى العمال عن بعد
بعد تحديد جميع العمليات الحسابية بواسطة ParameterServerStrategy
، ستستخدم فئة tf.distribute.experimental.coordinator.ClusterCoordinator
لإنشاء الموارد وتوزيع خطوات التدريب على العاملين عن بُعد.
لنقم أولاً بإنشاء كائن ClusterCoordinator
في كائن الإستراتيجية:
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
ثم أنشئ مجموعة بيانات لكل عامل ومكرر. في per_worker_dataset_fn
أدناه ، يوصى بلف مجموعة dataset_fn
في strategy.distribute_datasets_from_function
. يوصى بتوزيع مجموعات البيانات_وظيفة_الوظيفة للسماح بالجلب المسبق الفعال لوحدات معالجة الرسومات بسلاسة.
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
الخطوة الأخيرة هي توزيع الحساب على العاملين عن بعد باستخدام ClusterCoordinator.schedule
:
- يقوم أسلوب
schedule
بإدراج دالة tf في قائمة الانتظار وإرجاع قيمةtf.function
شبيهةRemoteValue
على الفور. سيتم إرسال الوظائف الموجودة في قائمة الانتظار إلى العمال البعيدين في مؤشرات الترابط في الخلفية وسيتم ملءRemoteValue
بشكل غير متزامن. - يمكن استخدام طريقة
join
(ClusterCoordinator.join
) للانتظار حتى يتم تنفيذ جميع الوظائف المجدولة.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). Finished epoch 0, accuracy is 0.543750. Finished epoch 1, accuracy is 0.543750. Finished epoch 2, accuracy is 0.950000. Finished epoch 3, accuracy is 1.000000.
إليك كيفية جلب نتيجة RemoteValue
:
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000
بدلاً من ذلك ، يمكنك تشغيل جميع الخطوات والقيام بشيء ما أثناء انتظار الانتهاء:
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
للحصول على التدريب الكامل وسير عمل الخدمة لهذا المثال المحدد ، يرجى مراجعة هذا الاختبار .
المزيد حول إنشاء مجموعة البيانات
يتم إنشاء مجموعة البيانات في الكود أعلاه باستخدام ClusterCoordinator.create_per_worker_dataset
API). يقوم بإنشاء مجموعة بيانات واحدة لكل عامل وإرجاع كائن حاوية. يمكنك استدعاء التابع iter
لإنشاء مكرر لكل عامل. يحتوي المكرر لكل عامل على مكرر واحد لكل عامل وسيتم استبدال الشريحة المقابلة للعامل في وسيطة الإدخال للدالة التي تم تمريرها إلى طريقة ClusterCoordinator.schedule
قبل تنفيذ الوظيفة على عامل معين.
في الوقت الحالي ، تفترض طريقة ClusterCoordinator.schedule
أن العمال متساوون ، وبالتالي تفترض أن مجموعات البيانات الخاصة بالعاملين المختلفين هي نفسها فيما عدا أنها قد يتم خلطها بشكل مختلف إذا كانت تحتوي على عملية Dataset.shuffle
. لهذا السبب ، يوصى أيضًا بتكرار مجموعات البيانات إلى أجل غير مسمى وأن تقوم بجدولة عدد محدود من الخطوات بدلاً من الاعتماد على OutOfRangeError
من مجموعة بيانات.
ملاحظة مهمة أخرى هي أن مجموعات بيانات tf.data
لا تدعم التسلسل الضمني وإلغاء التسلسل عبر حدود المهام. لذلك من المهم إنشاء مجموعة البيانات الكاملة داخل الوظيفة التي تم تمريرها إلى ClusterCoordinator.create_per_worker_dataset
.
تقييم
هناك أكثر من طريقة لتحديد وتشغيل حلقة التقييم في التدريب الموزع. لكل منها مزاياها وعيوبها كما هو موضح أدناه. يوصى باستخدام طريقة التقييم المضمنة إذا لم يكن لديك تفضيل.
التقييم المضمن
في هذه الطريقة ، يقوم المنسق بالتناوب بين التدريب والتقييم وبالتالي يطلق عليه التقييم الداخلي .
هناك العديد من الفوائد للتقييم المضمن. فمثلا:
- يمكن أن يدعم نماذج التقييم الكبيرة ومجموعات بيانات التقييم التي لا يمكن لمهمة واحدة الاحتفاظ بها.
- يمكن استخدام نتائج التقييم لاتخاذ قرارات لتدريب الحقبة التالية.
هناك طريقتان لتنفيذ التقييم المباشر: التقييم المباشر والتقييم الموزع.
- التقييم المباشر : بالنسبة للنماذج الصغيرة ومجموعات بيانات التقييم ، يمكن للمنسق إجراء التقييم مباشرة على النموذج الموزع مع مجموعة بيانات التقييم على المنسق:
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = tf.keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
- التقييم الموزع : بالنسبة للنماذج الكبيرة أو مجموعات البيانات التي لا يمكن تشغيلها مباشرة على المنسق ، يمكن لمهمة المنسق توزيع مهام التقييم على العمال عبر أساليب
ClusterCoordinator.schedule
/ClusterCoordinator.join
:
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = tf.keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources Evaluation accuracy: 1.000000
تقييم جانب السيارة
هناك طريقة أخرى تسمى تقييم السيارة الجانبية حيث تقوم بإنشاء مهمة مقيِّم مخصصة تقرأ نقاط التفتيش بشكل متكرر وتدير التقييم عند نقطة تفتيش أحدث. يسمح لبرنامجك التدريبي بالانتهاء مبكرًا إذا لم تكن بحاجة إلى تغيير حلقة التدريب بناءً على نتائج التقييم. ومع ذلك ، فإنه يتطلب مهمة مقيِّم إضافية ونقاط تفتيش دورية لتحريك التقييم. فيما يلي حلقة تقييم محتملة للسيارة الجانبية:
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epoches)):
break
مجموعات في العالم الحقيقي
في بيئة إنتاج حقيقية ، ستقوم بتشغيل جميع المهام في عمليات مختلفة على أجهزة مختلفة. إن أبسط طريقة لتكوين معلومات الكتلة في كل مهمة هي تعيين متغيرات البيئة "TF_CONFIG"
واستخدام tf.distribute.cluster_resolver.TFConfigClusterResolver
لتحليل "TF_CONFIG"
.
للحصول على وصف عام حول متغيرات البيئة "TF_CONFIG"
، راجع دليل التدريب الموزع .
إذا بدأت مهام التدريب الخاصة بك باستخدام Kubernetes أو قوالب تكوين أخرى ، فمن المحتمل جدًا أن هذه القوالب قد عينت بالفعل “TF_CONFIG"
لك.
اضبط متغير البيئة "TF_CONFIG"
لنفترض أن لديك 3 عمال وخادمين معاملين ، يمكن أن يكون "TF_CONFIG"
للعامل 1:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
يمكن أن يكون "TF_CONFIG"
للمقيم:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
جزء "cluster"
في سلسلة "TF_CONFIG"
أعلاه للمقيم اختياري.
إذا كنت تستخدم نفس النظام الثنائي لجميع المهام
إذا كنت تفضل تشغيل كل هذه المهام باستخدام ثنائي واحد ، فستحتاج إلى السماح لفرع البرنامج الخاص بك بأدوار مختلفة في البداية:
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# Run side-car evaluation
else:
# Run the coordinator.
يبدأ الكود التالي تشغيل خادم TensorFlow وينتظر:
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
معالجة فشل المهمة
فشل العامل
tf.distribute.experimental.coordinator.ClusterCoordinator
أو Model.fit
مع الخطأ لفشل العامل. عند استرداد العامل ، سيتم استدعاء وظيفة مجموعة البيانات المقدمة مسبقًا (إما إلى ClusterCoordinator.create_per_worker_dataset
لحلقة تدريب مخصصة ، أو tf.keras.utils.experimental.DatasetCreator
لـ Model.fit
) على العمال لإعادة إنشاء مجموعات البيانات.
فشل خادم المعلمة أو المنسق
ومع ذلك ، عندما يرى المنسق خطأ خادم معلمة ، فإنه سيرفع UnavailableError
أو AbortedError
على الفور. يمكنك إعادة تشغيل المنسق في هذه الحالة. يمكن أن يصبح المنسق نفسه غير متاح أيضًا. لذلك ، يوصى باستخدام أدوات معينة حتى لا تفقد تقدم التدريب:
بالنسبة لـ
Model.fit
، يجب عليك استخدام ردBackupAndRestore
، والذي يعالج حفظ التقدم والاستعادة تلقائيًا. انظر قسم عمليات الاسترجاعات والتدريب أعلاه للحصول على مثال.للحصول على حلقة تدريب مخصصة ، يجب عليك التحقق من متغيرات النموذج بشكل دوري وتحميل متغيرات النموذج من نقطة تحقق ، إن وجدت ، قبل بدء التدريب. يمكن الاستدلال على تقدم التدريب تقريبًا من
optimizer.iterations
إذا تم تحديد نقطة تحقق للمحسن:
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epoches):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
إحضار RemoteValue
إحضار RemoteValue
مضمون إذا تم تنفيذ الوظيفة بنجاح. هذا لأنه يتم حاليًا نسخ قيمة الإرجاع على الفور إلى المنسق بعد تنفيذ الوظيفة. إذا كان هناك أي فشل عامل أثناء النسخ ، فستتم إعادة محاولة الوظيفة على عامل آخر متاح. لذلك ، إذا كنت ترغب في تحسين الأداء ، يمكنك جدولة الوظائف بدون قيمة مرتجعة.
الإبلاغ عن الأخطاء
بمجرد أن يرى المنسق خطأ مثل UnavailableError
من خوادم المعلمات أو أخطاء التطبيق الأخرى مثل InvalidArgument
من tf.debugging.check_numerics
، فإنه سيلغي جميع الوظائف المعلقة والقائمة في قائمة الانتظار قبل رفع الخطأ. سيؤدي إحضار RemoteValue
المقابلة لها إلى ظهور خطأ CancelledError
.
بعد ظهور خطأ ، لن يقوم المنسق بإثارة نفس الخطأ أو أي خطأ من الوظائف الملغاة.
تحسين الأداء
هناك عدة أسباب محتملة إذا رأيت مشكلات في الأداء عند التدريب باستخدام ParameterServerStrategy
و ClusterResolver
.
أحد الأسباب الشائعة هو أن خوادم المعلمات بها حمل غير متوازن وأن بعض خوادم المعلمات المحملة بكثافة قد وصلت إلى السعة. يمكن أن يكون هناك أيضًا العديد من الأسباب الجذرية. بعض الطرق البسيطة للتخفيف من هذه المشكلة هي:
- قم بتقسيم متغيرات النموذج الكبيرة الخاصة بك عن طريق تحديد
variable_partitioner
عند إنشاءParameterServerStrategy
. - تجنب إنشاء متغير نقطة فعالة مطلوبًا من قبل جميع خوادم المعلمات في خطوة واحدة إن أمكن. على سبيل المثال ، استخدم معدل تعلم ثابتًا أو فئة فرعية
tf.keras.optimizers.schedules.LearningRateSchedule
في محسنات الأداء نظرًا لأن السلوك الافتراضي هو أن معدل التعلم سيصبح متغيرًا يتم وضعه على خادم معلمات معين ويطلبه جميع خوادم المعلمات الأخرى في كل خطوة . - قم بتبديل مفرداتك الكبيرة قبل تمريرها إلى طبقات معالجة Keras المسبقة.
سبب آخر محتمل لقضايا الأداء هو المنسق. يعتمد تنفيذك الأول schedule
/ join
على لغة Python ، وبالتالي قد يكون هناك عبء على مؤشر الترابط. كما يمكن أن يكون زمن الانتقال بين المنسق والعاملين كبيرًا. اذا كانت هذه القضيه،
بالنسبة لـ
Model.fit
، يمكنك تعيين وسيطةsteps_per_execution
المقدمة فيModel.compile
إلى قيمة أكبر من 1.للحصول على حلقة تدريب مخصصة ، يمكنك تجميع خطوات متعددة في وظيفة
tf.function
واحدة:
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
مع تحسين المكتبة بشكل أكبر ، نأمل ألا يضطر معظم المستخدمين إلى حزم الخطوات يدويًا في المستقبل.
بالإضافة إلى ذلك ، هناك حيلة صغيرة لتحسين الأداء تتمثل في جدولة الوظائف بدون قيمة مرتجعة كما هو موضح في قسم فشل مهمة المعالجة أعلاه.
القيود المعروفة
تمت تغطية معظم القيود المعروفة بالفعل في الأقسام أعلاه. يقدم هذا القسم ملخصا.
ParameterServerStrategy
العامة
-
os.environment["grpc_fail_fast"]="use_caller"
مطلوب في كل مهمة بما في ذلك المنسق ، لجعل التسامح مع الخطأ يعمل بشكل صحيح. - تدريب خادم المعامل المتزامن غير مدعوم.
- عادة ما يكون من الضروري تجميع خطوات متعددة في وظيفة واحدة لتحقيق الأداء الأمثل.
- لا يتم دعم تحميل نموذج_محفوظ عبر
tf.saved_model.load
يحتوي على متغيرات مُقسمة. لاحظ أن تحميل مثل هذا النموذج المحفوظ باستخدام خدمة TensorFlow من المتوقع أن يعمل. - لا يتم دعم تحميل نقطة اختبار تحتوي على متغيرات فتحة المُحسِّن المُقسمة إلى عدد مختلف من الأجزاء.
- لا يتم دعم الاسترداد من فشل خادم المعلمات بدون إعادة تشغيل مهمة المنسق.
- ينتج عن استخدام
tf.lookup.StaticHashTable
(الذي يتم استخدامه بشكل شائع في بعض طبقات معالجة Keras ، مثلtf.keras.layers.IntegerLookup
وtf.keras.layers.StringLookup
وtf.keras.layers.TextVectorization
) موارد موضوعة على المنسق في هذا الوقت مع تدريب خادم المعلمات. هذا له آثار على الأداء للبحث عن RPCs من العاملين إلى المنسق. هذه هي الأولوية القصوى الحالية لمعالجتها.
تفاصيل Model.fit
- مطلوب وسيطة
steps_per_epoch
فيModel.fit
. يمكنك تحديد قيمة توفر فترات زمنية مناسبة في حقبة ما. - لا يحتوي
ParameterServerStrategy
على دعم عمليات الاسترجاعات المخصصة التي تحتوي على مكالمات على مستوى الدُفعات لأسباب تتعلق بالأداء. يجب عليك تحويل هذه المكالمات إلى مكالمات على مستوى الفترة الزمنية باستخدامsteps_per_epoch
بشكل مناسب ، بحيث يتم استدعاؤها في كل عددsteps_per_epoch
من الخطوات. لا تتأثر عمليات الاسترجاعات المضمنة: تم تعديل مكالماتها على مستوى المجموعة لتكون فعالة. يتم الآن التخطيط لدعم استدعاءات مستوى المجموعة لـParameterServerStrategy
. - للسبب نفسه ، بخلاف الاستراتيجيات الأخرى ، يتم تسجيل شريط التقدم والمقاييس فقط في حدود الفترة الزمنية.
-
run_eagerly
غير معتمد.
تفاصيل حلقة التدريب المخصصة
- لا يدعم
ClusterCoordinator.schedule
ضمانات الزيارة لمجموعة البيانات. - عند استخدام
ClusterCoordinator.create_per_worker_dataset
، يجب إنشاء مجموعة البيانات بأكملها داخل الوظيفة التي تم تمريرها إليها. -
tf.data.Options
يتم تجاهل الخيارات في مجموعة بيانات تم إنشاؤها بواسطةClusterCoordinator.create_per_worker_dataset
.