مشاهده در TensorFlow.org | در Google Colab اجرا شود | مشاهده منبع در GitHub | دانلود دفترچه یادداشت |
بررسی اجمالی
آموزش سرور پارامتر یک روش متداول داده موازی برای افزایش مقیاس آموزش مدل در چندین ماشین است.
یک خوشه آموزشی سرور پارامتر شامل کارگران و سرورهای پارامتر است. متغیرها بر روی سرورهای پارامتر ایجاد می شوند و در هر مرحله توسط کارگران خوانده و به روز می شوند. به طور پیش فرض، کارگران این متغیرها را به طور مستقل و بدون همگام سازی با یکدیگر می خوانند و به روز می کنند. به همین دلیل است که گاهی اوقات آموزش به سبک سرور پارامتر را آموزش ناهمزمان می نامند.
در TensorFlow 2، آموزش سرور پارامتر توسط کلاس tf.distribute.experimental.ParameterServerStrategy
، که مراحل آموزش را در یک خوشه توزیع می کند که تا هزاران کارگر (همراه با سرورهای پارامتر) مقیاس می شود.
روش های آموزشی پشتیبانی شده
دو روش اصلی آموزشی پشتیبانی شده وجود دارد:
- Keras
Model.fit
API، که زمانی توصیه میشود که انتزاع سطح بالا و مدیریت آموزش را ترجیح میدهید. - یک حلقه آموزشی سفارشی (برای جزئیات بیشتر می توانید به آموزش سفارشی ، نوشتن یک حلقه آموزشی از ابتدا و حلقه آموزشی سفارشی با Keras و MultiWorkerMirroredStrategy مراجعه کنید.) زمانی که ترجیح می دهید جزئیات حلقه آموزشی آنها را تعریف کنید، آموزش حلقه سفارشی توصیه می شود.
خوشه ای با مشاغل و وظایف
صرف نظر از API انتخابی ( Model.fit
یا یک حلقه آموزشی سفارشی)، آموزش توزیع شده در TensorFlow 2 شامل: یک 'cluster'
با چندین 'jobs'
است و هر یک از کارها ممکن است یک یا چند 'tasks'
داشته باشد.
هنگام استفاده از آموزش سرور پارامتر، توصیه می شود:
- یک شغل هماهنگ کننده (که دارای نام شغلی
chief
است) - چندین شغل کارگری (نام شغل
worker
)؛ و - کارهای سرور چند پارامتری (نام شغل
ps
)
در حالی که هماهنگ کننده منابع ایجاد می کند، وظایف آموزشی را ارسال می کند، نقاط بازرسی می نویسد، و با خرابی کارها سروکار دارد، کارگران و سرورهای پارامتر tf.distribute.Server
را اجرا می کنند که به درخواست های هماهنگ کننده گوش می دهند.
آموزش سرور پارامتر با Model.fit
API
آموزش سرور پارامتر با Model.fit
API به هماهنگ کننده نیاز دارد که از یک شی tf.distribute.experimental.ParameterServerStrategy
و یک tf.keras.utils.experimental.DatasetCreator
به عنوان ورودی استفاده کند. مشابه استفاده از Model.fit
بدون استراتژی، یا با سایر استراتژیها، گردش کار شامل ایجاد و کامپایل مدل، آمادهسازی تماسهای برگشتی و به دنبال آن Model.fit
است.
آموزش سرور پارامتر با حلقه آموزشی سفارشی
با حلقه های آموزشی سفارشی، کلاس tf.distribute.experimental.coordinator.ClusterCoordinator
جزء کلیدی مورد استفاده برای هماهنگ کننده است.
- کلاس
ClusterCoordinator
باید در ارتباط با یک شیtf.distribute.Strategy
کند. - این شی
tf.distribute.Strategy
برای ارائه اطلاعات خوشه مورد نیاز است و برای تعریف مرحله آموزشی استفاده می شود، همانطور که در آموزش سفارشی با tf.distribute.Strategy نشان داده شده است. - شی
ClusterCoordinator
سپس اجرای این مراحل آموزشی را به کارگران راه دور ارسال می کند. - برای آموزش سرور پارامتر،
ClusterCoordinator
باید باtf.distribute.experimental.ParameterServerStrategy
کار کند.
مهم ترین API ارائه شده توسط شی ClusterCoordinator
schedule
است:
- API
schedule
زمانی یکtf.function
را در صف قرار می دهد و بلافاصله یکRemoteValue
شبیه به آینده را برمی گرداند. - توابع در صف به کارگران راه دور در رشته های پس زمینه فرستاده می شوند و
RemoteValue
آنها به صورت ناهمزمان پر می شود. - از آنجایی که
schedule
زمان بندی نیازی به تخصیص کارگر ندارد، تابعtf.function
منتقل شده می تواند بر روی هر کارگر موجود اجرا شود. - اگر کارگری که روی آن اجرا میشود قبل از تکمیل آن در دسترس نباشد، این تابع روی کارگر موجود دیگری دوباره امتحان میشود.
- به دلیل این واقعیت و این واقعیت که اجرای تابع اتمی نیست، یک تابع ممکن است بیش از یک بار اجرا شود.
علاوه بر ارسال توابع راه دور، ClusterCoordinator
همچنین به ایجاد مجموعه دادهها بر روی همه کارگران و بازسازی این مجموعه دادهها در زمانی که یک کارگر از شکست بهبود مییابد، کمک میکند.
راه اندازی آموزش
این آموزش به Model.fit
و مسیرهای حلقه آموزشی سفارشی منشعب می شود و شما می توانید متناسب با نیاز خود یکی را انتخاب کنید. بخش هایی غیر از "آموزش با X" برای هر دو مسیر قابل اجرا هستند.
pip install portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
راه اندازی خوشه
همانطور که در بالا ذکر شد، یک کلاستر آموزشی سرور پارامتر به یک کار هماهنگ کننده نیاز دارد که برنامه آموزشی شما را اجرا می کند، یک یا چند کارگر و وظایف سرور پارامتر که سرورهای tf.distribute.Server
- tf.distribute.Server - و احتمالاً یک کار ارزیابی اضافی که ارزیابی ماشین جانبی را اجرا می کند، نیاز دارد. (به بخش ارزیابی خودروی جانبی در زیر مراجعه کنید). الزامات تنظیم آنها عبارتند از:
- وظیفه هماهنگ کننده باید آدرس ها و پورت های سایر سرورهای TensorFlow را به جز ارزیاب بداند.
- کارگران و سرورهای پارامتر باید بدانند که به کدام پورت باید گوش دهند. برای سادگی، معمولاً هنگام ایجاد سرورهای TensorFlow در این وظایف، میتوانید اطلاعات خوشهای کامل را وارد کنید.
- وظیفه ارزیاب نیازی به دانستن تنظیمات خوشه آموزشی ندارد. اگر این کار را کرد، نباید سعی کند به کلاستر آموزشی متصل شود.
- کارگران و سرورهای پارامتر باید به ترتیب دارای انواع وظایف به عنوان
"worker"
و"ps"
باشند. هماهنگ کننده باید به دلایل قدیمی از"chief"
به عنوان نوع کار استفاده کند.
در این آموزش شما یک کلاستر در فرآیند ایجاد می کنید تا کل آموزش سرور پارامتر در Colab اجرا شود. نحوه راه اندازی کلاسترهای واقعی را در بخش بعدی یاد خواهید گرفت.
خوشه در حال فرآیند
شما با ایجاد چندین سرور TensorFlow از قبل شروع کرده و بعداً به آنها متصل خواهید شد. توجه داشته باشید که این فقط برای نمایش این آموزش است و در آموزش واقعی سرورها بر روی ماشین های "worker"
و "ps"
راه اندازی می شوند.
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name="worker",
task_index=i,
config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec,
job_name="ps",
task_index=i,
protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
تنظیم خوشه در فرآیند اغلب در آزمایش واحد استفاده می شود، مانند اینجا .
یکی دیگر از گزینههای آزمایش محلی، راهاندازی فرآیندها در ماشین محلی است—برای نمونهای از این رویکرد ، آموزش Multi-worker با Keras را بررسی کنید.
یک ParameterServerStrategy را نمونه سازی کنید
قبل از اینکه وارد کد آموزشی شوید، بیایید یک شی ParameterServerStrategy
را نمونه سازی کنیم. توجه داشته باشید که این بدون در نظر گرفتن اینکه آیا شما با Model.fit
یا یک حلقه آموزشی سفارشی ادامه می دهید، مورد نیاز است. آرگومان variable_partitioner
در بخش Variable Sharding توضیح داده خواهد شد.
variable_partitioner = (
tf.distribute.experimental.partitioners.MinSizePartitioner(
min_shard_bytes=(256 << 10),
max_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
به منظور استفاده از GPU برای آموزش، GPUهای قابل مشاهده برای هر کارگر را اختصاص دهید. ParameterServerStrategy
از تمام GPU های موجود در هر کارگر استفاده می کند، با این محدودیت که همه کارگران باید به همان تعداد GPU در دسترس باشند.
اشتراک گذاری متغیر
تقسیم بندی متغیر به تقسیم یک متغیر به چندین متغیر کوچکتر اشاره دارد که به آن هارد می گویند . اشتراک گذاری متغیر ممکن است برای توزیع بار شبکه هنگام دسترسی به این خرده ها مفید باشد. همچنین توزیع محاسبات و ذخیره سازی یک متغیر عادی در سرورهای چند پارامتری مفید است.
برای فعال کردن اشتراک گذاری variable_partitioner
، می توانید هنگام ساخت یک شی ParameterServerStrategy
، یک متغیر_partitioner را ارسال کنید. متغیر_partitioner هر بار که یک variable_partitioner
ایجاد میشود فراخوانی میشود و انتظار میرود که تعداد خردهها را در هر بعد از متغیر برگرداند. برخی از variable_partitioner
partitioner خارج از جعبه مانند tf.distribute.experimental.partitioners.MinSizePartitioner
ارائه شده است. توصیه می شود از پارتیشن های مبتنی بر اندازه مانند tf.distribute.experimental.partitioners.MinSizePartitioner
برای جلوگیری از پارتیشن بندی متغیرهای کوچک استفاده کنید که می تواند تأثیر منفی بر سرعت آموزش مدل داشته باشد.
هنگامی که یک variable_partitioner
ارسال میشود و اگر یک متغیر را مستقیماً تحت strategy.scope()
ایجاد کنید، به یک نوع ظرف با ویژگی variables
تبدیل میشود که دسترسی به لیست خردهها را فراهم میکند. در بیشتر موارد، این ظرف به طور خودکار با به هم پیوستن تمام خرده ها به یک Tensor تبدیل می شود. در نتیجه می توان از آن به عنوان یک متغیر عادی استفاده کرد. از سوی دیگر، برخی از روشهای TensorFlow مانند tf.nn.embedding_lookup
پیادهسازی کارآمدی را برای این نوع کانتینر فراهم میکنند و در این روشها از الحاق خودکار اجتناب میشود.
لطفاً برای جزئیات بیشتر به اسناد API tf.distribute.experimental.ParameterServerStrategy
مراجعه کنید.
آموزش با Model.fit
Keras یک API آموزشی با کاربرد آسان را از طریق Model.fit
که حلقه آموزشی زیر کاپوت را کنترل میکند، با انعطافپذیری train_step
، و تماسهای برگشتی، که قابلیتهایی مانند ذخیره نقطه بازرسی یا ذخیره خلاصه برای TensorBoard را فراهم میکند. با Model.fit
، می توان از همان کد آموزشی برای استراتژی های دیگر با تعویض ساده شی استراتژی استفاده کرد.
داده های ورودی
Model.fit
با آموزش سرور پارامتر مستلزم آن است که داده های ورودی در یک فراخوانی ارائه شود که یک آرگومان منفرد از نوع tf.distribute.InputContext
را می گیرد و یک tf.data.Dataset
را برمی گرداند. سپس، یک شی tf.keras.utils.experimental.DatasetCreator
که چنین callable
فراخوانی را می گیرد و یک شی اختیاری tf.distribute.InputOptions
را از طریق آرگومان input_options
کنید.
توجه داشته باشید که توصیه میشود دادهها را با آموزش سرور پارامتر به هم بزنید و تکرار کنید و در فراخوانی fit
، steps_per_epoch
را مشخص کنید تا کتابخانه مرزهای دوره را بشناسد.
لطفاً برای اطلاعات بیشتر در مورد آرگومان InputContext
به آموزش ورودی توزیع شده مراجعه کنید.
def dataset_fn(input_context):
global_batch_size = 64
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.shard(
input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2)
return dataset
dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
کد موجود در dataset_fn
در دستگاه ورودی که معمولاً CPU است، در هر یک از ماشینهای کارگر فراخوانی میشود.
ساخت و تدوین مدل
اکنون، یک tf.keras.Model
-یک مدل tf.keras.models.Sequential
بیاهمیت برای اهداف نمایشی ایجاد میکنید که با یک Model.compile
برای ترکیب اجزایی مانند بهینهساز، معیارها، یا پارامترهایی مانند steps_per_execution
:
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)
پاسخ به تماس و آموزش
قبل از اینکه model.fit
را برای آموزش واقعی فراخوانی کنید، بیایید فراخوان های مورد نیاز را برای کارهای رایج آماده کنیم، مانند:
-
ModelCheckpoint
: برای ذخیره وزن مدل. -
BackupAndRestore
: برای اطمینان از اینکه از پیشرفت آموزش به طور خودکار نسخه پشتیبان تهیه می شود و در صورت عدم دسترسی به خوشه (مانند سقط یا preemption) بازیابی می شود. یا -
TensorBoard
: برای ذخیره گزارش های پیشرفت در فایل های خلاصه، که در ابزار TensorBoard تجسم می شوند.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=log_dir),
tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step Epoch 2/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step Epoch 3/5 WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. 20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step Epoch 4/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step Epoch 5/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step <keras.callbacks.History at 0x7f89984ca890>
استفاده مستقیم با ClusterCoordinator
(اختیاری)
حتی اگر مسیر آموزشی Model.fit
را انتخاب کنید، میتوانید به صورت اختیاری یک شی tf.distribute.experimental.coordinator.ClusterCoordinator
را برای برنامهریزی سایر توابع که میخواهید روی کارگران اجرا شوند، نمونهسازی کنید. برای جزئیات و مثال های بیشتر به بخش آموزش با حلقه آموزش سفارشی مراجعه کنید.
آموزش با یک حلقه آموزشی سفارشی
استفاده از حلقه های آموزشی سفارشی با tf.distribute.Strategy
انعطاف پذیری زیادی را برای تعریف حلقه های آموزشی فراهم می کند. با ParameterServerStrategy
که در بالا تعریف شده است (به عنوان strategy
)، از tf.distribute.experimental.coordinator.ClusterCoordinator
برای ارسال اجرای مراحل آموزشی به کارگران راه دور استفاده خواهید کرد.
سپس، همانطور که در حلقه آموزشی با سایر tf.distribute.Strategy
انجام داده اید، یک مدل ایجاد می کنید، یک مجموعه داده و یک تابع مرحله تعریف می کنید. جزئیات بیشتر را می توانید در آموزش سفارشی با آموزش tf.distribute.Strategy بیابید .
برای اطمینان از واکشی کارآمد مجموعه داده، از APIهای ایجاد مجموعه داده توزیع شده توصیه شده که در مراحل آموزش اعزام به کارگران راه دور در زیر ذکر شده است استفاده کنید. همچنین، حتماً با Strategy.run
در داخل worker_fn
تماس بگیرید تا از مزایای کامل پردازندههای گرافیکی اختصاص داده شده به کارگران استفاده کنید. بقیه مراحل برای آموزش با یا بدون GPU یکسان است.
بیایید این اجزا را در مراحل زیر ایجاد کنیم:
داده ها را تنظیم کنید
ابتدا، تابعی بنویسید که مجموعه داده ای را ایجاد می کند که شامل منطق پیش پردازش پیاده سازی شده توسط لایه های پیش پردازش Keras است .
شما این لایهها را خارج از dataset_fn
ایجاد میکنید، اما تبدیل را در داخل مجموعه dataset_fn
اعمال میکنید، زیرا مجموعه dataset_fn
را در یک tf.function
قرار میدهید، که اجازه نمیدهد متغیرها در داخل آن ایجاد شوند.
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=feature_vocab,
mask_token=None)
label_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = tf.keras.layers.Input(
shape=(3,),
dtype=tf.string,
name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = tf.keras.Model(
{"features": raw_feature_input},
feature_id_input)
raw_label_input = tf.keras.layers.Input(
shape=(1,),
dtype=tf.string,
name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = tf.keras.Model(
{"label": raw_label_input},
label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison return bool(asarray(a1 == a2).all())
نمونه های اسباب بازی را در یک مجموعه داده ایجاد کنید:
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
سپس، مجموعه داده آموزشی پیچیده شده در یک dataset_fn
را ایجاد کنید:
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
مدل را بسازید
بعد، مدل و سایر اشیاء را ایجاد کنید. مطمئن شوید که همه متغیرها را تحت strategy.scope
ایجاد کرده اید.
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with Keras processing layers.
model_input = tf.keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = tf.keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
model = tf.keras.Model({"features": model_input}, dense_output)
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
accuracy = tf.keras.metrics.Accuracy()
اجازه دهید تأیید کنیم که استفاده از FixedShardsPartitioner
همه متغیرها را به دو قطعه تقسیم میکند و هر قطعه به سرورهای پارامتر متفاوتی اختصاص مییابد:
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"
مرحله آموزش را تعریف کنید
سوم، مرحله آموزشی را در یک tf.function
:
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
در تابع مرحله آموزشی بالا، فراخوانی Strategy.run
و Strategy.reduce
در step_fn
میتواند چندین GPU را برای هر کارگر پشتیبانی کند. اگر به کارگران GPU تخصیص داده شده باشد، Strategy.run
مجموعه داده ها را بر روی چندین نسخه توزیع می کند.
اعزام مراحل آموزشی به کارگران از راه دور
پس از اینکه همه محاسبات توسط ParameterServerStrategy
تعریف شدند، از کلاس tf.distribute.experimental.coordinator.ClusterCoordinator
برای ایجاد منابع و توزیع مراحل آموزشی برای کارگران راه دور استفاده خواهید کرد.
بیایید ابتدا یک شی ClusterCoordinator
ایجاد کنیم و شی استراتژی را وارد کنیم:
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
سپس، یک مجموعه داده برای هر کارگر و یک تکرار کننده ایجاد کنید. در per_worker_dataset_fn
زیر، قرار دادن مجموعه dataset_fn
در strategy.distribute_datasets_from_function
توصیه میشود تا امکان واکشی اولیه کارآمد به GPUها را فراهم کند.
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
مرحله آخر توزیع محاسبات بین کارگران راه دور با استفاده از ClusterCoordinator.schedule
:
- متد
schedule
یکtf.function
را در صف قرار می دهد و بلافاصله یکRemoteValue
شبیه آینده را برمی گرداند. توابع صف به کارمندان راه دور در رشته های پس زمینه فرستاده می شوند وRemoteValue
به صورت ناهمزمان پر می شود. - از متد
join
(ClusterCoordinator.join
) می توان برای صبر کردن تا اجرای تمام توابع برنامه ریزی شده استفاده کرد.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). Finished epoch 0, accuracy is 0.543750. Finished epoch 1, accuracy is 0.543750. Finished epoch 2, accuracy is 0.950000. Finished epoch 3, accuracy is 1.000000.
در اینجا نحوه واکشی نتیجه RemoteValue
:
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000
از طرف دیگر، می توانید تمام مراحل را راه اندازی کنید و در حالی که منتظر تکمیل هستید، کاری انجام دهید:
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
برای آموزش کامل و گردش کار برای این مثال خاص، لطفاً این تست را بررسی کنید.
اطلاعات بیشتر در مورد ایجاد مجموعه داده
مجموعه داده در کد بالا با استفاده از ClusterCoordinator.create_per_worker_dataset
API ایجاد شده است. برای هر کارگر یک مجموعه داده ایجاد می کند و یک شی ظرف را برمی گرداند. میتوانید متد iter
را برای ایجاد یک تکرارکننده برای هر کارگر فراخوانی کنید. تکرارکننده هر کارگر شامل یک تکرارکننده به ازای هر کارگر است و قطعه مربوط به یک کارگر در آرگومان ورودی تابع که قبل از اجرای تابع بر روی یک کارگر خاص به متد ClusterCoordinator.schedule
، جایگزین میشود.
در حال حاضر، روش ClusterCoordinator.schedule
فرض میکند که کارگران معادل هستند و بنابراین فرض میکند که مجموعه دادهها روی کارگران مختلف یکسان هستند، با این تفاوت که اگر حاوی یک عملیات Dataset.shuffle
باشند، ممکن است بهطور متفاوتی با هم ترکیب شوند. به همین دلیل، همچنین توصیه می شود که مجموعه داده ها به طور نامحدود تکرار شوند و به جای تکیه بر OutOfRangeError
از یک مجموعه داده، تعداد محدودی از مراحل را برنامه ریزی کنید.
نکته مهم دیگر این است که مجموعه دادههای tf.data
از سریالسازی و سریالزدایی ضمنی در سراسر مرزهای وظیفه پشتیبانی نمیکنند. بنابراین مهم است که کل مجموعه داده را در داخل تابع ارسال شده به ClusterCoordinator.create_per_worker_dataset
کنید.
ارزیابی
بیش از یک راه برای تعریف و اجرای یک حلقه ارزیابی در آموزش توزیع شده وجود دارد. هر کدام مزایا و معایب خاص خود را دارند که در زیر توضیح داده شده است. اگر ترجیحی ندارید، روش ارزیابی درون خطی توصیه می شود.
ارزیابی درون خطی
در این روش هماهنگ کننده بین آموزش و ارزشیابی متناوب می شود و از این رو به آن ارزشیابی درون خطی می گویند.
چندین مزیت ارزیابی درون خطی وجود دارد. مثلا:
- میتواند از مدلهای ارزیابی بزرگ و مجموعه دادههای ارزیابی که یک وظیفه نمیتواند انجام دهد، پشتیبانی کند.
- از نتایج ارزیابی می توان برای تصمیم گیری برای آموزش دوره بعدی استفاده کرد.
دو روش برای اجرای ارزیابی درون خطی وجود دارد: ارزیابی مستقیم و ارزیابی توزیع شده.
- ارزیابی مستقیم : برای مدلهای کوچک و مجموعه دادههای ارزیابی، هماهنگکننده میتواند ارزیابی را مستقیماً روی مدل توزیعشده با مجموعه داده ارزیابی روی هماهنگکننده اجرا کند:
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = tf.keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
- ارزیابی توزیعشده : برای مدلها یا مجموعههای داده بزرگ که اجرای مستقیم روی هماهنگکننده غیرممکن است، وظیفه هماهنگکننده میتواند وظایف ارزیابی را از طریق روشهای
ClusterCoordinator.schedule
/ClusterCoordinator.join
بین کارگران توزیع کند:
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = tf.keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources Evaluation accuracy: 1.000000
ارزیابی خودروی جانبی
روش دیگری به نام ارزیابی خودروی جانبی است که در آن یک کار ارزیاب اختصاصی ایجاد میکنید که به طور مکرر نقاط بازرسی را میخواند و ارزیابی را در آخرین ایست بازرسی انجام میدهد. اگر نیازی به تغییر حلقه آموزشی خود بر اساس نتایج ارزیابی ندارید، این امکان را به برنامه آموزشی شما می دهد تا زودتر تمام شود. با این حال، برای شروع ارزیابی به یک کار ارزیاب اضافی و بازرسی دوره ای نیاز دارد. در زیر یک حلقه ارزیابی احتمالی خودروی جانبی وجود دارد:
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epoches)):
break
خوشه ها در دنیای واقعی
در یک محیط تولید واقعی، شما تمامی وظایف را در فرآیندهای مختلف بر روی ماشین های مختلف اجرا خواهید کرد. ساده ترین راه برای پیکربندی اطلاعات کلاستر در هر کار، تنظیم متغیرهای محیطی "TF_CONFIG"
و استفاده از tf.distribute.cluster_resolver.TFConfigClusterResolver
برای تجزیه "TF_CONFIG"
است.
برای توضیحات کلی در مورد متغیرهای محیطی "TF_CONFIG"
، به راهنمای آموزشی Distributed مراجعه کنید.
اگر کارهای آموزشی خود را با استفاده از Kubernetes یا سایر قالب های پیکربندی شروع کنید، به احتمال زیاد این الگوها قبلاً “TF_CONFIG"
را برای شما تنظیم کرده اند.
متغیر محیطی "TF_CONFIG"
را تنظیم کنید
فرض کنید 3 کارگر و 2 سرور پارامتر دارید، "TF_CONFIG"
کارگر 1 می تواند باشد:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
"TF_CONFIG"
ارزیاب می تواند:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
بخش "cluster"
در رشته "TF_CONFIG"
بالا برای ارزیاب اختیاری است.
اگر از یک باینری برای همه کارها استفاده کنید
اگر ترجیح می دهید همه این وظایف را با استفاده از یک باینری واحد اجرا کنید، باید در همان ابتدا اجازه دهید برنامه خود به نقش های مختلف منشعب شود:
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# Run side-car evaluation
else:
# Run the coordinator.
کد زیر یک سرور TensorFlow را راه اندازی می کند و منتظر می ماند:
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
رسیدگی به شکست کار
شکست کارگر
tf.distribute.experimental.coordinator.ClusterCoordinator
یا Model.fit
تحمل خطای داخلی را برای خرابی کارگر فراهم می کند. پس از بازیابی کارگر، تابع مجموعه دادهای که قبلا ارائه شده بود (یا به ClusterCoordinator.create_per_worker_dataset
برای یک حلقه آموزشی سفارشی، یا tf.keras.utils.experimental.DatasetCreator
برای Model.fit
) روی کارگران برای ایجاد مجدد مجموعههای داده فراخوانی میشود.
سرور پارامتر یا هماهنگ کننده خراب است
با این حال، هنگامی که هماهنگ کننده یک خطای سرور پارامتر را می بیند، بلافاصله یک UnavailableError
یا AbortedError
را ایجاد می کند. در این مورد می توانید هماهنگ کننده را مجددا راه اندازی کنید. خود هماهنگ کننده نیز ممکن است در دسترس نباشد. بنابراین، ابزار خاصی برای از دست ندادن پیشرفت آموزش توصیه می شود:
برای
Model.fit
، باید ازBackupAndRestore
callback استفاده کنید، که ذخیره و بازیابی پیشرفت را به طور خودکار مدیریت می کند. برای مثال به بخش پاسخ به تماس و آموزش در بالا مراجعه کنید.برای یک حلقه آموزشی سفارشی، باید متغیرهای مدل را به صورت دوره ای بررسی کنید و متغیرهای مدل را در صورت وجود، قبل از شروع آموزش، از یک نقطه بازرسی بارگیری کنید. اگر یک بهینهساز چک پوینت باشد، میتوان پیشرفت آموزش را تقریباً از
optimizer.iterations
استنباط کرد:
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epoches):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
واکشی RemoteValue
اگر یک تابع با موفقیت اجرا شود، واکشی RemoteValue
تضمین می شود که موفقیت آمیز باشد. این به این دلیل است که در حال حاضر مقدار بازگشتی بلافاصله پس از اجرای یک تابع به هماهنگ کننده کپی می شود. اگر در حین کپی هر گونه خرابی کارگر وجود داشته باشد، این تابع روی کارگر موجود دیگری امتحان می شود. بنابراین، اگر میخواهید برای عملکرد بهینهسازی کنید، میتوانید توابع را بدون مقدار بازگشتی زمانبندی کنید.
گزارش خطا
هنگامی که هماهنگ کننده خطایی مانند UnavailableError
از سرورهای پارامتر یا سایر خطاهای برنامه مانند InvalidArgument
از tf.debugging.check_numerics
را ببیند، قبل از افزایش خطا، همه توابع در انتظار و صف را لغو می کند. واکشی RemoteValue
مربوط به آنها یک CancelledError
ایجاد می کند.
پس از مطرح شدن یک خطا، هماهنگ کننده همان خطا یا خطای توابع لغو شده را مطرح نخواهد کرد.
بهبود عملکرد
اگر هنگام تمرین با ParameterServerStrategy
و ClusterResolver
، مشکلات عملکردی را مشاهده کردید، چندین دلیل ممکن وجود دارد.
یکی از دلایل رایج این است که سرورهای پارامتر دارای بار نامتعادل هستند و برخی از سرورهای پارامتر با بارگذاری سنگین به ظرفیت رسیده اند. همچنین می تواند دلایل ریشه ای متعددی داشته باشد. چند روش ساده برای کاهش این مشکل عبارتند از:
- هنگام ساخت یک
ParameterServerStrategy
، متغیرهای مدل بزرگ خود را از طریق تعیین یکvariable_partitioner
تقسیم کنید. - در صورت امکان از ایجاد یک متغیر hotspot که توسط همه سرورهای پارامتر مورد نیاز است در یک مرحله خودداری کنید. به عنوان مثال، از یک نرخ یادگیری ثابت یا زیر کلاس
tf.keras.optimizers.schedules.LearningRateSchedule
در بهینه سازها استفاده کنید زیرا رفتار پیش فرض این است که نرخ یادگیری به متغیری تبدیل می شود که روی یک سرور پارامتر خاص قرار می گیرد و توسط تمام سرورهای پارامتر دیگر در هر مرحله درخواست می شود. . - واژگان بزرگ خود را قبل از ارسال آنها به لایه های پیش پردازش Keras با هم مخلوط کنید.
یکی دیگر از دلایل احتمالی مشکلات عملکرد، هماهنگ کننده است. اولین پیادهسازی schedule
/ join
شما مبتنی بر پایتون است و بنابراین ممکن است دارای سربار نخ باشد. همچنین تأخیر بین هماهنگ کننده و کارگران می تواند زیاد باشد. اگر این مورد است،
برای
Model.fit
، می توانید آرگومانsteps_per_execution
ارائه شده درModel.compile
را روی مقداری بزرگتر از 1 تنظیم کنید.برای یک حلقه آموزشی سفارشی، می توانید چندین مرحله را در یک
tf.function
:
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
از آنجایی که کتابخانه بیشتر بهینه شده است، امیدواریم اکثر کاربران مجبور نباشند در آینده مراحل را به صورت دستی بسته بندی کنند.
علاوه بر این، یک ترفند کوچک برای بهبود عملکرد، برنامهریزی عملکردها بدون مقدار بازگشتی است، همانطور که در بخش شکست کار مدیریت در بالا توضیح داده شد.
محدودیت های شناخته شده
بیشتر محدودیت های شناخته شده قبلاً در بخش های بالا پوشش داده شده است. این بخش خلاصه ای را ارائه می دهد.
ParameterServerStrategy
کلی
-
os.environment["grpc_fail_fast"]="use_caller"
برای هر کار از جمله هماهنگ کننده مورد نیاز است تا تحمل خطا به درستی کار کند. - آموزش سرور پارامتر همزمان پشتیبانی نمی شود.
- معمولاً برای دستیابی به عملکرد بهینه لازم است چندین مرحله در یک عملکرد واحد جمع آوری شود.
- بارگذاری saved_model از طریق
tf.saved_model.load
حاوی متغیرهای خرد شده پشتیبانی نمی شود. توجه داشته باشید که بارگیری چنین saved_model با استفاده از TensorFlow Serving انتظار می رود کار کند. - بارگذاری یک ایست بازرسی حاوی متغیرهای شکاف بهینه ساز خرد شده در تعداد متفاوتی از قطعات پشتیبانی نمی شود.
- برای بازیابی از خرابی سرور پارامتر بدون راه اندازی مجدد کار هماهنگ کننده پشتیبانی نمی شود.
- استفاده از
tf.lookup.StaticHashTable
(که معمولاً توسط برخی از لایههای پیشپردازش Keras استفاده میشود، مانندtf.keras.layers.IntegerLookup
،tf.keras.layers.StringLookup
، وtf.keras.layers.TextVectorization
) منجر به منابعی میشود که روی آنها قرار میگیرند. هماهنگ کننده در این زمان با آموزش سرور پارامتر. این پیامدهای عملکردی برای جستجوی RPC از کارگران به هماهنگ کننده دارد. رسیدگی به این موضوع در حال حاضر اولویت بالایی دارد.
مشخصات Model.fit
- آرگومان
steps_per_epoch
درModel.fit
مورد نیاز است. می توانید مقداری را انتخاب کنید که فواصل مناسب را در یک دوره ارائه دهد. -
ParameterServerStrategy
از تماس های سفارشی که تماس های سطح دسته ای به دلایل عملکرد دارند پشتیبانی نمی کند. شما باید آن تماسها را با استفاده ازsteps_per_epoch
به تماسهای سطح دوره تبدیل کنید، به طوری که هر تعدادsteps_per_epoch
نامیده میشوند. تماسهای داخلی تحت تأثیر قرار نمیگیرند: تماسهای سطح دستهای آنها به گونهای اصلاح شدهاند که عملکردی داشته باشند. پشتیبانی از تماس های سطح دسته ای برایParameterServerStrategy
در حال برنامه ریزی است. - به همین دلیل، بر خلاف سایر استراتژیها، نوار پیشرفت و معیارها فقط در مرزهای دوره ثبت میشوند.
-
run_eagerly
پشتیبانی نمی شود.
مشخصات حلقه آموزشی سفارشی
-
ClusterCoordinator.schedule
از تضمین بازدید برای یک مجموعه داده پشتیبانی نمی کند. - وقتی از
ClusterCoordinator.create_per_worker_dataset
استفاده می شود، کل مجموعه داده باید در داخل تابع ارسال شده به آن ایجاد شود. -
tf.data.Options
در مجموعه داده ایجاد شده توسطClusterCoordinator.create_per_worker_dataset
نادیده گرفته می شود.