tf.distribute APIগুলি ব্যবহারকারীদের একটি একক মেশিন থেকে একাধিক মেশিনে তাদের প্রশিক্ষণ স্কেল করার একটি সহজ উপায় প্রদান করে। তাদের মডেল স্কেল করার সময়, ব্যবহারকারীদের একাধিক ডিভাইসে তাদের ইনপুট বিতরণ করতে হবে। tf.distribute
API প্রদান করে যা ব্যবহার করে আপনি স্বয়ংক্রিয়ভাবে ডিভাইস জুড়ে আপনার ইনপুট বিতরণ করতে পারেন।
এই নির্দেশিকাটি আপনাকে বিভিন্ন উপায় দেখাবে যেখানে আপনি tf.distribute
API ব্যবহার করে বিতরণ করা ডেটাসেট এবং পুনরাবৃত্তিকারী তৈরি করতে পারেন। অতিরিক্তভাবে, নিম্নলিখিত বিষয়গুলি কভার করা হবে:
- tf.distribute.Strategy.experimental_distribute_dataset এবং tf.distribute.Strategy.distribute_datasets_from_function ব্যবহার করার সময় ব্যবহার,
tf.distribute.Strategy.experimental_distribute_dataset
এবংtf.distribute.Strategy.distribute_datasets_from_function
। - বিভিন্ন উপায়ে আপনি বিতরণ করা ডেটাসেটের উপর পুনরাবৃত্তি করতে পারেন।
-
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
API এবংtf.data
API-এর মধ্যে পার্থক্য এবং সেইসাথে ব্যবহারকারীরা তাদের ব্যবহারের ক্ষেত্রে যে কোনো সীমাবদ্ধতা দেখা দিতে পারে।
এই নির্দেশিকাটি কেরাস API-এর সাথে বিতরণ করা ইনপুটের ব্যবহার কভার করে না।
বিতরণ করা ডেটাসেট
স্কেল করার জন্য tf.distribute
API ব্যবহার করতে, ব্যবহারকারীদের তাদের ইনপুট উপস্থাপন করতে tf.data.Dataset
ব্যবহার করার পরামর্শ দেওয়া হয়। tf.distribute
কে tf.data.Dataset এর সাথে দক্ষতার সাথে কাজ করার জন্য তৈরি করা হয়েছে (উদাহরণস্বরূপ, প্রতিটি অ্যাক্সিলারেটর ডিভাইসে ডেটার স্বয়ংক্রিয় tf.data.Dataset
) কর্মক্ষমতা অপ্টিমাইজেশানগুলিকে নিয়মিতভাবে বাস্তবায়নে অন্তর্ভুক্ত করা হচ্ছে। আপনার যদি tf.data.Dataset
ব্যতীত অন্য কিছু ব্যবহার করার ক্ষেত্রে একটি ব্যবহারের ক্ষেত্রে থাকে, তাহলে অনুগ্রহ করে এই নির্দেশিকায় পরবর্তী বিভাগটি দেখুন। একটি নন-ডিস্ট্রিবিউটেড ট্রেনিং লুপে, ব্যবহারকারীরা প্রথমে একটি tf.data.Dataset
উদাহরণ তৈরি করে এবং তারপরে উপাদানগুলির উপর পুনরাবৃত্তি করে। উদাহরণ স্বরূপ:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
ব্যবহারকারীদের একটি ব্যবহারকারীর বিদ্যমান কোডে ন্যূনতম পরিবর্তন সহ tf.distribute
কৌশল ব্যবহার করার অনুমতি দেওয়ার জন্য, দুটি API চালু করা হয়েছিল যা একটি tf.data.Dataset
উদাহরণ বিতরণ করবে এবং একটি বিতরণ করা ডেটাসেট অবজেক্ট ফিরিয়ে দেবে। একজন ব্যবহারকারী তখন এই বিতরণ করা ডেটাসেট উদাহরণের উপর পুনরাবৃত্তি করতে পারে এবং তাদের মডেলকে আগের মতো প্রশিক্ষণ দিতে পারে। আসুন এখন আমরা দুটি API - tf.distribute.Strategy.experimental_distribute_dataset
এবং tf.distribute.Strategy.distribute_datasets_from_function
আরও বিস্তারিতভাবে দেখি:
tf.distribute.Strategy.experimental_distribute_dataset
ব্যবহার
এই APIটি ইনপুট হিসাবে একটি tf.data.Dataset
উদাহরণ নেয় এবং একটি tf.distribute.DistributedDataset
উদাহরণ প্রদান করে। আপনার ইনপুট ডেটাসেটটিকে এমন একটি মান সহ ব্যাচ করা উচিত যা গ্লোবাল ব্যাচের আকারের সমান। এই বিশ্বব্যাপী ব্যাচের আকার হল নমুনার সংখ্যা যা আপনি 1 ধাপে সমস্ত ডিভাইস জুড়ে প্রক্রিয়া করতে চান। আপনি পাইথনিক ফ্যাশনে এই বিতরণ করা ডেটাসেটের উপর পুনরাবৃত্তি করতে পারেন বা iter
ব্যবহার করে একটি ইটারেটর তৈরি করতে পারেন। প্রত্যাবর্তিত বস্তুটি কোনো tf.data.Dataset
উদাহরণ নয় এবং অন্য কোনো API সমর্থন করে না যা কোনোভাবেই ডেটাসেটকে রূপান্তরিত বা পরিদর্শন করে। আপনি যদি বিভিন্ন প্রতিলিপিতে আপনার ইনপুট ভাগ করতে চান এমন নির্দিষ্ট উপায় না থাকলে এটি প্রস্তাবিত API।
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>) 2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
বৈশিষ্ট্য
ব্যাচিং
tf.distribute
ইনপুট tf.data.Dataset
ইন্সট্যান্সকে একটি নতুন ব্যাচের আকারের সাথে রিব্যাচ করে যা সিঙ্কে থাকা প্রতিলিপিগুলির সংখ্যা দ্বারা বিভক্ত গ্লোবাল ব্যাচের আকারের সমান। সিঙ্কের প্রতিলিপির সংখ্যা প্রশিক্ষণের সময় গ্রেডিয়েন্ট অলরিডুসে অংশ নিচ্ছে এমন ডিভাইসের সংখ্যার সমান। যখন একজন ব্যবহারকারী বিতরণ করা পুনরাবৃত্তিকারীতে next
কল করে, তখন প্রতিটি প্রতিলিপিতে একটি প্রতি প্রতিলিপি ব্যাচ আকারের ডেটা ফেরত দেওয়া হয়। পুনঃব্যাচ করা ডেটাসেট কার্ডিনালিটি সর্বদা প্রতিলিপিগুলির সংখ্যার একাধিক হবে৷ এখানে উদাহরণ তুলে ধরা হলো:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- বিতরণ ছাড়া:
- ব্যাচ 1: [0, 1, 2, 3]
- ব্যাচ 2: [4, 5]
2টির বেশি প্রতিলিপি বিতরণ সহ। শেষ ব্যাচটি ([4, 5]) 2টি প্রতিলিপির মধ্যে বিভক্ত।
ব্যাচ 1:
- প্রতিরূপ 1:[0, 1]
- প্রতিরূপ 2:[2, 3]
ব্যাচ 2:
- প্রতিরূপ 2: [4]
- প্রতিরূপ 2: [5]
tf.data.Dataset.range(4).batch(4)
- বিতরণ ছাড়া:
- ব্যাচ 1: [[0], [1], [2], [3]]
- 5টির বেশি প্রতিলিপি বিতরণ সহ:
- ব্যাচ 1:
- প্রতিরূপ 1: [0]
- প্রতিরূপ 2: [1]
- প্রতিরূপ 3: [2]
- প্রতিরূপ 4: [3]
- প্রতিরূপ 5: []
tf.data.Dataset.range(8).batch(4)
- বিতরণ ছাড়া:
- ব্যাচ 1: [0, 1, 2, 3]
- ব্যাচ 2: [4, 5, 6, 7]
- 3টির বেশি প্রতিলিপি বিতরণ সহ:
- ব্যাচ 1:
- প্রতিরূপ 1: [0, 1]
- প্রতিরূপ 2: [2, 3]
- রেপ্লিকা 3: []
- ব্যাচ 2:
- প্রতিরূপ 1: [4, 5]
- প্রতিরূপ 2: [6, 7]
- রেপ্লিকা 3: []
ডেটাসেট রিব্যাচ করার একটি স্থান জটিলতা রয়েছে যা প্রতিলিপির সংখ্যার সাথে রৈখিকভাবে বৃদ্ধি পায়। এর মানে হল যে বহু কর্মী প্রশিক্ষণ ব্যবহারের ক্ষেত্রে ইনপুট পাইপলাইনে OOM ত্রুটি হতে পারে।
শেয়ারিং
tf.distribute
MultiWorkerMirroredStrategy
TPUStrategy-এর সাথে মাল্টি ওয়ার্কার প্রশিক্ষণে ইনপুট ডেটাসেটকে TPUStrategy
করে। প্রতিটি ডেটাসেট কর্মীর CPU ডিভাইসে তৈরি করা হয়। কর্মীদের একটি সেটের উপর একটি ডেটাসেট অটোশার্ড করার অর্থ হল প্রতিটি কর্মীকে সমগ্র ডেটাসেটের একটি উপসেট বরাদ্দ করা হয়েছে (যদি সঠিক tf.data.experimental.AutoShardPolicy
সেট করা থাকে)। এটি নিশ্চিত করার জন্য যে প্রতিটি ধাপে, প্রতিটি কর্মীর দ্বারা অ ওভারল্যাপিং ডেটাসেট উপাদানগুলির একটি গ্লোবাল ব্যাচের আকার প্রক্রিয়া করা হবে। অটোশার্ডিংয়ের কয়েকটি ভিন্ন বিকল্প রয়েছে যা tf.data.experimental.DistributeOptions
ব্যবহার করে নির্দিষ্ট করা যেতে পারে। মনে রাখবেন যে ParameterServerStrategy
সার্ভার স্ট্র্যাটেজির সাথে মাল্টি ওয়ার্কার প্রশিক্ষণে কোনো অটোশার্ডিং নেই, এবং এই কৌশলটির সাহায্যে ডেটাসেট তৈরির বিষয়ে আরও তথ্য প্যারামিটার সার্ভার স্ট্র্যাটেজি টিউটোরিয়ালে পাওয়া যাবে।
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
আপনি tf.data.experimental.AutoShardPolicy
এর জন্য তিনটি ভিন্ন বিকল্প সেট করতে পারেন:
- স্বয়ংক্রিয়: এটি ডিফল্ট বিকল্প যার মানে FILE দ্বারা শার্ড করার চেষ্টা করা হবে। একটি ফাইল-ভিত্তিক ডেটাসেট সনাক্ত না হলে FILE দ্বারা শার্ড করার প্রচেষ্টা ব্যর্থ হয়৷
tf.distribute
তারপর DATA দ্বারা শার্ডিংয়ে ফিরে আসবে। মনে রাখবেন যে ইনপুট ডেটাসেট যদি ফাইল-ভিত্তিক হয় কিন্তু ফাইলের সংখ্যা শ্রমিকের সংখ্যার চেয়ে কম হয়, তাহলে একটিInvalidArgumentError
উত্থাপিত হবে। যদি এটি ঘটে থাকে, স্পষ্টভাবে নীতিটিকেAutoShardPolicy.DATA
তে সেট করুন বা আপনার ইনপুট উত্সকে ছোট ফাইলগুলিতে বিভক্ত করুন যাতে ফাইলের সংখ্যা কর্মীদের সংখ্যার চেয়ে বেশি হয়৷ ফাইল: আপনি যদি সমস্ত কর্মীদের উপর ইনপুট ফাইলগুলি শার্ড করতে চান তবে এই বিকল্প। আপনার এই বিকল্পটি ব্যবহার করা উচিত যদি ইনপুট ফাইলের সংখ্যা কর্মীদের সংখ্যার চেয়ে অনেক বেশি হয় এবং ফাইলের ডেটা সমানভাবে বিতরণ করা হয়। এই বিকল্পের নেতিবাচক দিক হল নিষ্ক্রিয় কর্মী থাকা যদি ফাইলের ডেটা সমানভাবে বিতরণ করা না হয়। যদি ফাইলের সংখ্যা কর্মীদের সংখ্যার চেয়ে কম হয়, তাহলে একটি
InvalidArgumentError
উত্থাপিত হবে। যদি এটি ঘটে, স্পষ্টভাবে নীতিটিকেAutoShardPolicy.DATA
তে সেট করুন। উদাহরণ স্বরূপ, আসুন আমরা 2 জন কর্মীদের উপর 2টি ফাইল বিতরণ করি যার প্রতিটিতে 1টি প্রতিরূপ রয়েছে। ফাইল 1 এ রয়েছে [0, 1, 2, 3, 4, 5] এবং ফাইল 2 এ রয়েছে [6, 7, 8, 9, 10, 11]। সিঙ্কে প্রতিলিপির মোট সংখ্যা 2 এবং বিশ্বব্যাপী ব্যাচের আকার 4 হতে দিন।- কর্মী 0:
- ব্যাচ 1 = প্রতিরূপ 1: [0, 1]
- ব্যাচ 2 = প্রতিরূপ 1: [2, 3]
- ব্যাচ 3 = প্রতিরূপ 1: [4]
- ব্যাচ 4 = প্রতিরূপ 1: [5]
- কর্মী 1:
- ব্যাচ 1 = প্রতিরূপ 2: [6, 7]
- ব্যাচ 2 = প্রতিরূপ 2: [8, 9]
- ব্যাচ 3 = রেপ্লিকা 2: [10]
- ব্যাচ 4 = প্রতিরূপ 2: [11]
ডেটা: এটি সমস্ত কর্মীদের জুড়ে উপাদানগুলিকে অটোশার্ড করবে৷ প্রতিটি কর্মী সম্পূর্ণ ডেটাসেট পড়বে এবং শুধুমাত্র এটিতে বরাদ্দকৃত শার্ড প্রক্রিয়া করবে। অন্য সব shards বাতিল করা হবে. এটি সাধারণত ব্যবহার করা হয় যদি ইনপুট ফাইলের সংখ্যা কর্মীদের সংখ্যার চেয়ে কম হয় এবং আপনি সমস্ত কর্মীদের জুড়ে ডেটার আরও ভাল ভাগ করতে চান। নেতিবাচক দিক হল যে সমগ্র ডেটাসেট প্রতিটি কর্মীর উপর পড়া হবে। উদাহরণ স্বরূপ, আসুন আমরা 2 কর্মীদের উপর 1 টি ফাইল বিতরণ করি। ফাইল 1 এ রয়েছে [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]। সিঙ্কে প্রতিলিপিগুলির মোট সংখ্যা 2 হতে দিন।
- কর্মী 0:
- ব্যাচ 1 = প্রতিরূপ 1: [0, 1]
- ব্যাচ 2 = প্রতিরূপ 1: [4, 5]
- ব্যাচ 3 = প্রতিরূপ 1: [8, 9]
- কর্মী 1:
- ব্যাচ 1 = প্রতিরূপ 2: [2, 3]
- ব্যাচ 2 = প্রতিরূপ 2: [6, 7]
- ব্যাচ 3 = প্রতিরূপ 2: [10, 11]
বন্ধ: আপনি যদি অটোশার্ডিং বন্ধ করেন, প্রতিটি কর্মী সমস্ত ডেটা প্রক্রিয়া করবে। উদাহরণ স্বরূপ, আসুন আমরা 2 কর্মীদের উপর 1 টি ফাইল বিতরণ করি। ফাইল 1 এ রয়েছে [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]। সিঙ্কে প্রতিলিপির মোট সংখ্যা 2 হতে দিন। তারপর প্রতিটি কর্মী নিম্নলিখিত বিতরণ দেখতে পাবে:
- কর্মী 0:
- ব্যাচ 1 = প্রতিরূপ 1: [0, 1]
- ব্যাচ 2 = প্রতিরূপ 1: [2, 3]
- ব্যাচ 3 = প্রতিরূপ 1: [4, 5]
- ব্যাচ 4 = প্রতিরূপ 1: [6, 7]
- ব্যাচ 5 = প্রতিরূপ 1: [8, 9]
ব্যাচ 6 = প্রতিরূপ 1: [10, 11]
কর্মী 1:
ব্যাচ 1 = প্রতিরূপ 2: [0, 1]
ব্যাচ 2 = প্রতিরূপ 2: [2, 3]
ব্যাচ 3 = প্রতিরূপ 2: [4, 5]
ব্যাচ 4 = প্রতিরূপ 2: [6, 7]
ব্যাচ 5 = প্রতিরূপ 2: [8, 9]
ব্যাচ 6 = প্রতিরূপ 2: [10, 11]
প্রিফেচিং
ডিফল্টরূপে, tf.distribute
ব্যবহারকারীর দেওয়া tf.data.Dataset
উদাহরণের শেষে একটি প্রিফেচ রূপান্তর যোগ করে। প্রিফেচ ট্রান্সফর্মেশনের আর্গুমেন্ট যা buffer_size
সিঙ্কে থাকা প্রতিলিপির সংখ্যার সমান।
tf.distribute.Strategy.distribute_datasets_from_function
ব্যবহার
এই API একটি ইনপুট ফাংশন নেয় এবং একটি tf.distribute.DistributedDataset
উদাহরণ প্রদান করে। ব্যবহারকারীরা যে ইনপুট ফাংশনটি পাস করে তাতে একটি tf.distribute.InputContext
আর্গুমেন্ট থাকে এবং একটি tf.data.Dataset
উদাহরণ প্রদান করা উচিত। এই API-এর সাহায্যে, tf.distribute
ব্যবহারকারীর tf.data.Dataset
ইনপুট ফাংশন থেকে ফিরে আসা উদাহরণে আর কোনো পরিবর্তন করে না। ডেটাসেটটি ব্যাচ এবং শার্ড করার দায়িত্ব ব্যবহারকারীর। tf.distribute
প্রতিটি শ্রমিকের CPU ডিভাইসে ইনপুট ফাংশনকে কল করে। ব্যবহারকারীদেরকে তাদের নিজস্ব ব্যাচিং এবং শার্ডিং লজিক নির্দিষ্ট করার অনুমতি দেওয়ার পাশাপাশি, মাল্টি ওয়ার্কার প্রশিক্ষণের জন্য ব্যবহার করার সময় tf.distribute.Strategy.experimental_distribute_dataset
তুলনায় এই APIটি আরও ভাল মাপযোগ্যতা এবং কর্মক্ষমতা প্রদর্শন করে।
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
বৈশিষ্ট্য
ব্যাচিং
tf.data.Dataset
দৃষ্টান্ত যা ইনপুট ফাংশনের রিটার্ন মান প্রতি রেপ্লিকা ব্যাচ আকার ব্যবহার করে ব্যাচ করা উচিত। প্রতি প্রতিলিপি ব্যাচের আকার হল বিশ্বব্যাপী ব্যাচের আকারকে সিঙ্ক প্রশিক্ষণে অংশ নেওয়া প্রতিলিপিগুলির সংখ্যা দ্বারা ভাগ করা হয়। কারণ tf.distribute
প্রতিটি শ্রমিকের CPU ডিভাইসে ইনপুট ফাংশনকে কল করে। একটি প্রদত্ত কর্মীর উপর তৈরি করা ডেটাসেটটি সেই কর্মীর সমস্ত প্রতিলিপি ব্যবহার করার জন্য প্রস্তুত হওয়া উচিত।
শেয়ারিং
tf.distribute.InputContext
অবজেক্ট যেটি ব্যবহারকারীর ইনপুট ফাংশনে একটি আর্গুমেন্ট হিসাবে অন্তর্নিহিতভাবে পাস করা হয় তা হুডের নীচে tf.distribute
দ্বারা তৈরি করা হয়। এটিতে কর্মীদের সংখ্যা, বর্তমান কর্মী আইডি ইত্যাদি সম্পর্কে তথ্য রয়েছে৷ এই ইনপুট ফাংশনটি tf.distribute.InputContext
অবজেক্টের অংশ এই বৈশিষ্ট্যগুলি ব্যবহার করে ব্যবহারকারীর দ্বারা নির্ধারিত নীতি অনুসারে শার্ডিং পরিচালনা করতে পারে৷
প্রিফেচিং
tf.distribute
ব্যবহারকারীর প্রদত্ত ইনপুট ফাংশন দ্বারা ফেরত দেওয়া tf.data.Dataset
এর শেষে একটি প্রিফেচ রূপান্তর যোগ করে না।
বিতরণ করা পুনরাবৃত্তিকারী
নন-ডিস্ট্রিবিউটেড tf.data.Dataset
দৃষ্টান্তগুলির মতো, আপনাকে tf.distribute.DistributedDataset
দৃষ্টান্তগুলির উপর পুনরাবৃত্তি করতে এবং tf.distribute.DistributedDataset
এর উপাদানগুলি অ্যাক্সেস করতে একটি পুনরাবৃত্তিকারী তৈরি করতে হবে। নিম্নলিখিত উপায়ে আপনি একটি tf.distribute.DistributedIterator
তৈরি করতে পারেন এবং আপনার মডেলকে প্রশিক্ষণ দিতে এটি ব্যবহার করতে পারেন:
ব্যবহার
লুপ নির্মাণের জন্য একটি পাইথনিক ব্যবহার করুন
আপনি tf.distribute.DistributedDataset এর উপর পুনরাবৃত্তি করতে একটি ব্যবহারকারী বান্ধব tf.distribute.DistributedDataset
লুপ ব্যবহার করতে পারেন। tf.distribute.DistributedIterator
থেকে প্রত্যাবর্তিত উপাদানগুলি একটি একক tf.Tensor
বা একটি tf.distribute.DistributedValues
হতে পারে যাতে প্রতি প্রতিলিপিতে একটি মান থাকে৷ একটি tf.function
এর ভিতরে লুপ স্থাপন করলে কর্মক্ষমতা বৃদ্ধি পাবে। যাইহোক, break
এবং return
বর্তমানে একটি tf.distribute.DistributedDataset এর উপর একটি লুপের জন্য সমর্থিত নয় যা একটি tf.distribute.DistributedDataset
এর ভিতরে স্থাপন করা tf.function
।
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\020TensorDataset:29" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
একটি স্পষ্ট পুনরাবৃত্তিকারী তৈরি করতে iter
ব্যবহার করুন
একটি tf.distribute.DistributedDataset
উদাহরণের উপাদানগুলির উপর পুনরাবৃত্তি করতে, আপনি iter
API ব্যবহার করে একটি tf.distribute.DistributedIterator
তৈরি করতে পারেন। একটি স্পষ্ট পুনরাবৃত্তিকারীর সাহায্যে, আপনি একটি নির্দিষ্ট সংখ্যক ধাপের জন্য পুনরাবৃত্তি করতে পারেন। একটি tf.distribute.DistributedIterator
ইনস্ট্যান্স dist_iterator
থেকে পরবর্তী উপাদান পেতে, আপনি next(dist_iterator)
, dist_iterator.get_next()
, অথবা dist_iterator.get_next_as_optional()
কল করতে পারেন। পূর্ববর্তী দুটি মূলত একই:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
next()
বা tf.distribute.DistributedIterator.get_next()
এর সাথে, যদি tf.distribute.DistributedIterator
শেষ হয়ে যায়, একটি OutOfRange ত্রুটি নিক্ষেপ করা হবে। ক্লায়েন্ট পাইথন সাইডে ত্রুটিটি ধরতে পারে এবং চেকপয়েন্টিং এবং মূল্যায়নের মতো অন্যান্য কাজ চালিয়ে যেতে পারে। যাইহোক, এটি কাজ করবে না যদি আপনি একটি হোস্ট ট্রেনিং লুপ ব্যবহার করেন (অর্থাৎ, tf.function
প্রতি একাধিক ধাপ চালান), যা দেখতে এইরকম:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
একটি tf.range
এর ভিতরে স্টেপ বডি গুটিয়ে একাধিক ধাপ ধারণ করে। এই ক্ষেত্রে, কোনো নির্ভরতা ছাড়া লুপের বিভিন্ন পুনরাবৃত্তি সমান্তরালভাবে শুরু হতে পারে, তাই পূর্ববর্তী পুনরাবৃত্তির গণনা শেষ হওয়ার আগে একটি OutOfRange ত্রুটি পরবর্তী পুনরাবৃত্তিগুলিতে ট্রিগার করা যেতে পারে। একবার একটি OutOfRange ত্রুটি নিক্ষিপ্ত হলে, ফাংশনের সমস্ত অপ্স অবিলম্বে বন্ধ হয়ে যাবে। যদি এটি এমন কিছু ক্ষেত্রে হয় যা আপনি এড়াতে চান, একটি বিকল্প যা একটি OutOfRange ত্রুটি ফেলে না তা হল tf.distribute.DistributedIterator.get_next_as_optional()
। get_next_as_optional
একটি tf.experimental.Optional
করে যাতে পরবর্তী উপাদান থাকে বা কোন মান থাকে না যদি tf.distribute.DistributedIterator
শেষ হয়ে যায়।
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') 2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_0" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9 } } attr { key: "metadata" value { s: "\n\020RangeDataset:104" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
element_spec
সম্পত্তি ব্যবহার করে
আপনি যদি একটি বিতরণ করা ডেটাসেটের উপাদানগুলিকে একটি tf.function
এ পাস করেন এবং একটি tf.TypeSpec
গ্যারান্টি চান, আপনি tf.function
এর input_signature
যুক্তি নির্দিষ্ট করতে পারেন। একটি বিতরণ করা ডেটাসেটের আউটপুট হল tf.distribute.DistributedValues
যা একটি একক ডিভাইস বা একাধিক ডিভাইসে ইনপুট উপস্থাপন করতে পারে। এই বিতরণ করা মানের সাথে সম্পর্কিত tf.TypeSpec
পেতে আপনি বিতরণ করা ডেটাসেট বা বিতরণ করা element_spec
বৈশিষ্ট্য ব্যবহার করতে পারেন।
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:122" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
আংশিক ব্যাচ
আংশিক ব্যাচের সম্মুখীন হয় যখন tf.data.Dataset
ইন্সট্যান্স যা ব্যবহারকারীরা তৈরি করেন তাতে ব্যাচের আকার থাকতে পারে যা প্রতিলিপির সংখ্যা দ্বারা সমানভাবে বিভাজ্য নয় অথবা যখন ডেটাসেট ইন্সট্যান্সের কার্ডিনালিটি ব্যাচের আকার দ্বারা বিভাজ্য না হয়। এর মানে হল যে যখন ডেটাসেটটি একাধিক প্রতিলিপিতে বিতরণ করা হয়, তখন কিছু পুনরাবৃত্তিকারীর next
কলের ফলে একটি OutOfRangeError হবে। এই ব্যবহার কেসটি পরিচালনা করার জন্য, tf.distribute
ব্যাচ সাইজ 0 এর ডামি ব্যাচগুলি রিপ্লিকাগুলিতে ফেরত দেয় যেগুলিতে প্রক্রিয়া করার জন্য আর কোনও ডেটা নেই৷
একক কর্মীর ক্ষেত্রে, যদি ইটারেটারে next
কলে ডেটা ফেরত না আসে, 0 ব্যাচ আকারের ডামি ব্যাচ তৈরি করা হয় এবং ডেটাসেটে আসল ডেটার সাথে ব্যবহার করা হয়। আংশিক ব্যাচের ক্ষেত্রে, ডেটার শেষ গ্লোবাল ব্যাচে ডেটার ডামি ব্যাচের পাশাপাশি বাস্তব ডেটা থাকবে। ডেটা প্রসেস করার স্টপিং কন্ডিশন এখন চেক করে যে কোনো প্রতিলিপিতে ডেটা আছে কিনা। কোনো প্রতিলিপিতে কোনো ডেটা না থাকলে, একটি OutOfRange ত্রুটি নিক্ষেপ করা হয়।
মাল্টি ওয়ার্কার ক্ষেত্রে, প্রতিটি শ্রমিকের ডেটা উপস্থিতির প্রতিনিধিত্বকারী বুলিয়ান মান ক্রস রেপ্লিকা কমিউনিকেশন ব্যবহার করে একত্রিত করা হয় এবং এটি সনাক্ত করতে ব্যবহৃত হয় যে সমস্ত কর্মী বিতরণ করা ডেটাসেট প্রক্রিয়াকরণ শেষ করেছে কিনা। যেহেতু এটি ক্রস ওয়ার্কার কমিউনিকেশন জড়িত তাই কিছু পারফরম্যান্স পেনাল্টি জড়িত।
সতর্কতা
একাধিক কর্মী সেটআপের সাথে
tf.distribute.Strategy.experimental_distribute_dataset
API ব্যবহার করার সময়, ব্যবহারকারীরা ফাইল থেকে পড়া একটিtf.data.Dataset
পাস করে। যদিtf.data.experimental.AutoShardPolicy
AUTO
বাFILE
এ সেট করা থাকে, তাহলে প্রকৃত প্রতি ধাপে ব্যাচের আকার ব্যবহারকারীর দ্বারা নির্ধারিত গ্লোবাল ব্যাচের আকারের চেয়ে ছোট হতে পারে। ফাইলের অবশিষ্ট উপাদানগুলি গ্লোবাল ব্যাচের আকারের চেয়ে কম হলে এটি ঘটতে পারে। ব্যবহারকারীরা হয় চালানোর জন্য ধাপের সংখ্যার উপর নির্ভর না করেই ডেটাসেটটি নিষ্কাশন করতে পারে বা এটিকে ঘিরে কাজ করার জন্যDATA
tf.data.experimental.AutoShardPolicy
তে সেট করতে পারে।স্টেটফুল ডেটাসেট ট্রান্সফরমেশন বর্তমানে
tf.distribute
এর সাথে সমর্থিত নয় এবং ডেটাসেটের যে কোনো স্টেটফুল অপ্স বর্তমানে উপেক্ষা করা হয়েছে। উদাহরণস্বরূপ, যদি আপনার ডেটাসেটে একটিmap_fn
যা একটি চিত্র ঘোরানোর জন্যtf.random.uniform
ব্যবহার করে, তাহলে আপনার কাছে একটি ডেটাসেট গ্রাফ রয়েছে যা স্থানীয় মেশিনে যেখানে পাইথন প্রক্রিয়াটি চালানো হচ্ছে তার অবস্থার (যেমন এলোমেলো বীজ) উপর নির্ভর করে।পরীক্ষামূলক
tf.data.experimental.OptimizationOptions
যেগুলি ডিফল্টরূপে অক্ষম করা হয় কিছু নির্দিষ্ট প্রসঙ্গে -- যেমনtf.distribute
সাথে একসাথে ব্যবহার করা হলে -- কার্যক্ষমতার অবনতি ঘটাতে পারে। আপনি একটি বিতরণ সেটিংসে আপনার কাজের চাপের কার্যকারিতাকে উপকৃত করে তা যাচাই করার পরেই আপনার তাদের সক্ষম করা উচিত।সাধারণভাবে
tf.data
দিয়ে কীভাবে আপনার ইনপুট পাইপলাইন অপ্টিমাইজ করবেন তার জন্য অনুগ্রহ করে এই নির্দেশিকাটি পড়ুন। কিছু অতিরিক্ত টিপস:আপনার যদি একাধিক কর্মী থাকে এবং আপনি এক বা একাধিক গ্লোব প্যাটার্নের সাথে মিলে যাওয়া সমস্ত ফাইল থেকে একটি ডেটাসেট তৈরি করতে
tf.data.Dataset.list_files
ব্যবহার করছেন, তাহলেseed
যুক্তি সেট করতে বাshuffle=False
সেট করতে মনে রাখবেন যাতে প্রতিটি কর্মী ফাইলটিকে ধারাবাহিকভাবে ভাগ করতে পারে।যদি আপনার ইনপুট পাইপলাইনে রেকর্ড স্তরে ডেটা শাফেল করা এবং ডেটা পার্স করা উভয়ই অন্তর্ভুক্ত থাকে, যদি না পার্স করা ডেটা পার্স করা ডেটার থেকে উল্লেখযোগ্যভাবে বড় হয় (যা সাধারণত হয় না), নীচের উদাহরণে দেখানো হিসাবে প্রথমে শাফেল করুন এবং তারপর পার্স করুন৷ এটি মেমরি ব্যবহার এবং কর্মক্ষমতা উপকৃত হতে পারে।
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
buffer_size
উপাদানগুলির একটি অভ্যন্তরীণ বাফার বজায় রাখে এবং এইভাবেbuffer_size
হ্রাস করা OOM সমস্যাকে উপশম করতে পারে।tf.distribute.experimental_distribute_dataset
বাtf.distribute.distribute_datasets_from_function
ব্যবহার করার সময় কর্মীরা যে ক্রমানুসারে ডেটা প্রক্রিয়াজাত করে তার নিশ্চয়তা নেই। এটি সাধারণত প্রয়োজন হয় যদি আপনি স্কেল ভবিষ্যদ্বাণী করতেtf.distribute
ব্যবহার করেন। তবে আপনি ব্যাচের প্রতিটি উপাদানের জন্য একটি সূচক সন্নিবেশ করতে পারেন এবং সেই অনুযায়ী আউটপুট অর্ডার করতে পারেন। নিচের স্নিপেটটি কিভাবে আউটপুট অর্ডার করতে হয় তার একটি উদাহরণ।
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46} 2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_4" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9223372036854775807 } } attr { key: "metadata" value { s: "\n\020RangeDataset:162" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } }
আমি যদি ক্যানোনিকাল tf.data.Dataset উদাহরণ ব্যবহার না করি তাহলে আমি কীভাবে আমার ডেটা বিতরণ করব?
কখনও কখনও ব্যবহারকারীরা তাদের ইনপুট এবং পরবর্তীতে একাধিক ডিভাইসে ডেটাসেট বিতরণ করার জন্য উপরে উল্লিখিত APIগুলি উপস্থাপন করতে tf.data.Dataset
ব্যবহার করতে পারে না। এই ধরনের ক্ষেত্রে আপনি একটি জেনারেটর থেকে কাঁচা টেনসর বা ইনপুট ব্যবহার করতে পারেন।
নির্বিচারে টেনসর ইনপুটগুলির জন্য পরীক্ষামূলক_ডিস্ট্রিবিউট_মান_ফাংশন ব্যবহার করুন
strategy.run
tf.distribute.DistributedValues
গ্রহণ করে যা next(iterator)
এর আউটপুট। টেনসরের মানগুলি পাস করতে, কাঁচা টেনসর থেকে tf.distribute.DistributedValues
করতে experimental_distribute_values_from_function
distribute_values_from_function ব্যবহার করুন।
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
আপনার ইনপুট একটি জেনারেটর থেকে হলে tf.data.Dataset.from_generator ব্যবহার করুন
আপনার যদি একটি জেনারেটর ফাংশন থাকে যা আপনি ব্যবহার করতে চান, আপনি from_generator
API ব্যবহার করে একটি tf.data.Dataset
উদাহরণ তৈরি করতে পারেন।
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2" op: "FlatMapDataset" input: "TensorDataset/_1" attr { key: "Targuments" value { list { } } } attr { key: "_cardinality" value { i: -2 } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_flat_map_fn_3980" } } } attr { key: "metadata" value { s: "\n\022FlatMapDataset:178" } } attr { key: "output_shapes" value { list { shape { dim { size: 4 } } } } } attr { key: "output_types" value { list { type: DT_FLOAT } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.