عرض على TensorFlow.org | تشغيل في Google Colab | عرض المصدر على جيثب | تحميل دفتر |
تعتبر فكرة مجموعة البيانات التي يرمز إليها العملاء (مثل المستخدمين) ضرورية للحساب الموحد كما هو موضح في TFF. يوفر TFF واجهة tff.simulation.datasets.ClientData
مجردة على هذا المفهوم، ومجموعات البيانات التي تستضيف TFF ( ستاكوفيرفلوو ، شكسبير ، emnist ، cifar100 ، و gldv2 ) عن تنفيذ هذه الواجهة.
إذا كنت تعمل على التعلم الاتحادية مع مجموعة البيانات الخاصة بك، TFF تشجع بشدة على تنفيذ إما ClientData
اجهة أو استخدام واحدة من وظائف المساعد TFF لتوليد ClientData
التي تمثل البيانات على القرص، على سبيل المثال tff.simulation.datasets.ClientData.from_clients_and_fn
.
لأن معظم TFF في الأمثلة نهاية إلى نهاية تبدأ ClientData
الكائنات، وتنفيذ ClientData
التفاعل مع مجموعة البيانات المخصصة سيجعل من الاسهل لspelunk من خلال التعليمات البرمجية الموجودة مكتوبة مع TFF. علاوة على ذلك، tf.data.Datasets
التي ClientData
بنيات يمكن كرر أكثر مباشرة لانتاج هياكل numpy
صفائف، لذلك ClientData
كائنات يمكن استخدامها مع أي إطار ML مقرها بيثون قبل أن ينتقل إلى TFF.
هناك العديد من الأنماط التي يمكنك من خلالها تسهيل حياتك إذا كنت تنوي توسيع نطاق عمليات المحاكاة الخاصة بك إلى العديد من الأجهزة أو نشرها. أدناه سوف نسير من خلال عدد قليل من الطرق التي يمكننا استخدامها ClientData
وTFF لجعل لدينا على نطاق صغير التكرار لنطاق واسع من التجارب لإنتاج تجربة نشر بأكبر قدر من السلاسة.
ما هو النمط الذي يجب علي استخدامه لتمرير ClientData إلى TFF؟
سوف نناقش اثنين الأعراف TFF في ClientData
في العمق. إذا كنت تندرج في أي من الفئتين أدناه ، فمن الواضح أنك ستفضل واحدة على الأخرى. إذا لم يكن الأمر كذلك ، فقد تحتاج إلى فهم أكثر تفصيلاً لإيجابيات وسلبيات كل منها لاتخاذ خيار أكثر دقة.
أريد التكرار في أسرع وقت ممكن على جهاز محلي ؛ لست بحاجة إلى أن أكون قادرًا على الاستفادة بسهولة من وقت تشغيل TFF الموزع.
- تريد تمرير
tf.data.Datasets
في لTFF مباشرة. - وهذا يسمح لك البرنامج حتما مع
tf.data.Dataset
الكائنات، ومعالجتها بشكل تعسفي. - يوفر مرونة أكثر من الخيار أدناه ؛ يتطلب دفع المنطق للعملاء أن يكون هذا المنطق قابلاً للتسلسل.
- تريد تمرير
أريد تشغيل حسابي المتحد في وقت تشغيل TFF البعيد ، أو أخطط للقيام بذلك قريبًا.
- في هذه الحالة ، تريد تعيين بناء مجموعة البيانات والمعالجة المسبقة للعملاء.
- هذه النتائج فيكم تمر ببساطة قائمة
client_ids
مباشرة إلى حساب الاتحادية الخاص بك. - يؤدي دفع إنشاء مجموعة البيانات والمعالجة المسبقة للعملاء إلى تجنب الاختناقات في التسلسل ، ويزيد بشكل كبير من الأداء مع مئات الآلاف من العملاء.
قم بإعداد بيئة مفتوحة المصدر
# tensorflow_federated_nightly also bring in tf_nightly, which
# can causes a duplicate tensorboard install, leading to errors.
!pip uninstall --yes tensorboard tb-nightly
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
حزم الاستيراد
import collections
import time
import tensorflow as tf
import tensorflow_federated as tff
معالجة كائن ClientData
دعونا نبدأ اليوم التحميل واستكشاف TFF في EMNIST ClientData
:
client_data, _ = tff.simulation.datasets.emnist.load_data()
Downloading emnist_all.sqlite.lzma: 100%|██████████| 170507172/170507172 [00:19<00:00, 8831921.67it/s] 2021-10-01 11:17:58.718735: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
يمكن أن تفقد بيانات الأولى تخبرنا ما هو نوع من الأمثلة هي في ClientData
.
first_client_id = client_data.client_ids[0]
first_client_dataset = client_data.create_tf_dataset_for_client(
first_client_id)
print(first_client_dataset.element_spec)
# This information is also available as a `ClientData` property:
assert client_data.element_type_structure == first_client_dataset.element_spec
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])
علما بأن عائدات مجموعة البيانات collections.OrderedDict
الكائنات التي لها pixels
و label
مفاتيح، حيث بكسل هو موتر مع شكل [28, 28]
. لنفترض أننا نود أن تتسطح المدخلات للخروج إلى شكل [784]
. أن أحد السبل الممكنة يمكننا أن نفعل هذا يكون لتطبيق وظيفة ما قبل التصنيع لدينا ClientData
الكائن.
def preprocess_dataset(dataset):
"""Create batches of 5 examples, and limit to 3 batches."""
def map_fn(input):
return collections.OrderedDict(
x=tf.reshape(input['pixels'], shape=(-1, 784)),
y=tf.cast(tf.reshape(input['label'], shape=(-1, 1)), tf.int64),
)
return dataset.batch(5).map(
map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE).take(5)
preprocessed_client_data = client_data.preprocess(preprocess_dataset)
# Notice that we have both reshaped and renamed the elements of the ordered dict.
first_client_dataset = preprocessed_client_data.create_tf_dataset_for_client(
first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])
قد نرغب بالإضافة إلى إجراء بعض المعالجة المسبقة الأكثر تعقيدًا (وربما ذات الحالة) ، على سبيل المثال الخلط.
def preprocess_and_shuffle(dataset):
"""Applies `preprocess_dataset` above and shuffles the result."""
preprocessed = preprocess_dataset(dataset)
return preprocessed.shuffle(buffer_size=5)
preprocessed_and_shuffled = client_data.preprocess(preprocess_and_shuffle)
# The type signature will remain the same, but the batches will be shuffled.
first_client_dataset = preprocessed_and_shuffled.create_tf_dataset_for_client(
first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])
التواصل مع tff.Computation
الآن يمكننا أن أداء بعض التلاعب الأساسية مع ClientData
الأشياء، ونحن مستعدون للبيانات تغذية ل tff.Computation
. نحدد tff.templates.IterativeProcess
الذي ينفذ اتحاد المتوسط ، واستكشاف طرق مختلفة لتمرير البيانات.
def model_fn():
model = tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(784,)),
tf.keras.layers.Dense(10, kernel_initializer='zeros'),
])
return tff.learning.from_keras_model(
model,
# Note: input spec is the _batched_ shape, and includes the
# label tensor which will be passed to the loss function. This model is
# therefore configured to accept data _after_ it has been preprocessed.
input_spec=collections.OrderedDict(
x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
y=tf.TensorSpec(shape=[None, 1], dtype=tf.int64)),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
trainer = tff.learning.build_federated_averaging_process(
model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.01))
وقبل أن نبدأ العمل مع هذه IterativeProcess
، تعليق واحد على دلالات ClientData
في محله. A ClientData
يمثل الكائن مجمل السكان متاح للتدريب الاتحادية، التي بشكل عام هو غير متوفرة للبيئة تنفيذ نظام FL إنتاج وغير محددة لمحاكاة. ClientData
الواقع يعطي المستخدم القدرة على تجاوز الحوسبة الاتحادية تماما وببساطة تدريب نموذج من جانب الخادم كالمعتاد عبر ClientData.create_tf_dataset_from_all_clients
.
بيئة محاكاة TFF تضع الباحث في سيطرة كاملة على الحلقة الخارجية. على وجه الخصوص ، هذا يعني أن اعتبارات توفر العميل ، وانقطاع العميل ، وما إلى ذلك ، يجب معالجتها من قبل المستخدم أو برنامج بايثون النصي. يمكن للمرء عن التسرب العميل مثال نموذجي عن طريق ضبط توزيع المعاينة على الخاص ClientData's
client_ids
مثل أن المستخدمين مع المزيد من البيانات (وتبعا لأطول تشغيل الحسابات المحلية) وسيتم اختيار مع انخفاض احتمال.
ومع ذلك ، في النظام الفيدرالي الحقيقي ، لا يمكن اختيار العملاء بشكل صريح من قبل المدرب النموذجي ؛ يتم تفويض اختيار العملاء للنظام الذي يقوم بتنفيذ الحساب الموحد.
يمر tf.data.Datasets
مباشرة إلى TFF
خيار واحد لدينا للتفاعل بين ClientData
و IterativeProcess
هو أن بناء tf.data.Datasets
في بيثون، وتمرير هذه المجموعات إلى TFF.
لاحظ أن إذا أردنا استخدام preprocessed لدينا ClientData
مجموعات البيانات سلمنا هي من النوع المناسب المتوقع من قبل نموذجنا المحدد أعلاه.
selected_client_ids = preprocessed_and_shuffled.client_ids[:10]
preprocessed_data_for_clients = [
preprocessed_and_shuffled.create_tf_dataset_for_client(
selected_client_ids[i]) for i in range(10)
]
state = trainer.initialize()
for _ in range(5):
t1 = time.time()
state, metrics = trainer.next(state, preprocessed_data_for_clients)
t2 = time.time()
print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.compat.v1.graph_util.extract_sub_graph` WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.compat.v1.graph_util.extract_sub_graph` loss 2.9005744457244873, round time 4.576513767242432 loss 3.113278388977051, round time 0.49641919136047363 loss 2.7581865787506104, round time 0.4904160499572754 loss 2.87259578704834, round time 0.48976993560791016 loss 3.1202380657196045, round time 0.6724586486816406
وإذا أخذنا هذا الطريق، ولكننا لن تكون قادرة على التحرك بشكل مسلي لمحاكاة المتعدد ذات. مجموعات البيانات نبني في وقت TensorFlow المحلي يمكن التقاط دولة من البيئة المحيطة الثعبان، وتفشل في التسلسل أو إلغاء التسلسل عندما تحاول الدولة المرجعية التي لم تعد متوفرة لهم. هذا يمكن أن تظهر على سبيل المثال في خطأ غامض من TensorFlow في tensor_util.cc
:
Check failed: DT_VARIANT == input.dtype() (21 vs. 20)
رسم خرائط البناء والمعالجة المسبقة للعملاء
لتجنب هذه المشكلة، توصي TFF مستخدميها للنظر في مجموعة البيانات مثيل وتجهيزها بوصفها شيئا ما يحدث محليا على كل عميل، واستخدام المساعدين TFF أو federated_map
لتشغيل صراحة هذا الرمز تجهيزها في كل عميل.
من الناحية المفاهيمية ، فإن سبب تفضيل ذلك واضح: في وقت التشغيل المحلي لـ TFF ، يكون للعملاء "عرضًا" فقط إمكانية الوصول إلى بيئة Python العالمية نظرًا لحقيقة أن التنسيق الموحد بأكمله يحدث على جهاز واحد. من الجدير بالملاحظة في هذه المرحلة أن التفكير المماثل يؤدي إلى فلسفة TFF الوظيفية عبر الأنظمة الأساسية والقابلة للتسلسل دائمًا.
TFF يجعل مثل هذا التغيير بسيط عبر ClientData's
السمة dataset_computation
، و tff.Computation
الذي يأخذ client_id
وإرجاع المرتبطة tf.data.Dataset
.
لاحظ أن preprocess
يعمل ببساطة مع dataset_computation
. و dataset_computation
سمة من preprocessed ClientData
تضم خط أنابيب تجهيزها بالكامل حددنا فقط:
print('dataset computation without preprocessing:')
print(client_data.dataset_computation.type_signature)
print('\n')
print('dataset computation with preprocessing:')
print(preprocessed_and_shuffled.dataset_computation.type_signature)
dataset computation without preprocessing: (string -> <label=int32,pixels=float32[28,28]>*) dataset computation with preprocessing: (string -> <x=float32[?,784],y=int64[?,1]>*)
نحن يمكن استدعاء dataset_computation
والحصول على بيانات حريصة في وقت التشغيل بيثون، ولكن يمارس السلطة الحقيقية لهذا النهج عندما يؤلف مع عملية تكرارية أو حساب آخر لتجنب تجسيد هذه المجموعات في وقت حريصة العالمي على الإطلاق. يوفر TFF وظيفة مساعد tff.simulation.compose_dataset_computation_with_iterative_process
التي يمكن استخدامها لتفعل بالضبط هذا.
trainer_accepting_ids = tff.simulation.compose_dataset_computation_with_iterative_process(
preprocessed_and_shuffled.dataset_computation, trainer)
كل من هذه tff.templates.IterativeProcesses
واحدة فوق تشغيل بنفس الطريقة. ولكن السابق يقبل مجموعات البيانات العميل preprocessed، وهذا الأخير يقبل سلاسل تمثل هويات العملاء، ومعالجة كل بناء مجموعة البيانات وتجهيزها في الجسم - في الواقع state
يمكن أن تنتقل بين البلدين.
for _ in range(5):
t1 = time.time()
state, metrics = trainer_accepting_ids.next(state, selected_client_ids)
t2 = time.time()
print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
loss 2.8417396545410156, round time 1.6707067489624023 loss 2.7670371532440186, round time 0.5207102298736572 loss 2.665048122406006, round time 0.5302855968475342 loss 2.7213189601898193, round time 0.5313887596130371 loss 2.580148935317993, round time 0.5283482074737549
التحجيم لأعداد كبيرة من العملاء
trainer_accepting_ids
يمكن على الفور أن تستخدم في وقت التشغيل المتعدد ذات TFF، وويتجنب تتجسد tf.data.Datasets
وحدة تحكم (وبالتالي تسلسل منهم وإرسالها إلى العمال).
يؤدي هذا إلى تسريع عمليات المحاكاة الموزعة بشكل كبير ، خاصة مع عدد كبير من العملاء ، ويتيح التجميع الوسيط لتجنب حدوث تسلسل / إلغاء تسلسل مماثل.
deepdive الاختياري: تكوين منطق المعالجة المسبقة يدويًا في TFF
تم تصميم TFF للتكوين من الألف إلى الياء ؛ نوع التكوين الذي يؤديه للتو مساعد TFF تحت سيطرتنا تمامًا كمستخدمين. أننا يمكن أن يكون يدويا تكوين حساب تجهيزها حددنا للتو مع المدرب تلقاء next
بكل بساطة:
selected_clients_type = tff.FederatedType(preprocessed_and_shuffled.dataset_computation.type_signature.parameter, tff.CLIENTS)
@tff.federated_computation(trainer.next.type_signature.parameter[0], selected_clients_type)
def new_next(server_state, selected_clients):
preprocessed_data = tff.federated_map(preprocessed_and_shuffled.dataset_computation, selected_clients)
return trainer.next(server_state, preprocessed_data)
manual_trainer_with_preprocessing = tff.templates.IterativeProcess(initialize_fn=trainer.initialize, next_fn=new_next)
في الواقع ، هذا هو ما يفعله المساعد الذي استخدمناه بشكل فعال تحت الغطاء (بالإضافة إلى إجراء فحص ومعالجة مناسبة للنوع). نحن يمكن حتى أعربوا عن نفس منطق مختلف قليلا، من خلال تسلسل preprocess_and_shuffle
إلى tff.Computation
، ومتحللة federated_map
في خطوة واحدة الذي يبني قواعد البيانات-preprocessed الامم المتحدة وآخر الذي يدير preprocess_and_shuffle
في كل عميل.
يمكننا التحقق من أن هذا المسار اليدوي ينتج عنه حسابات بنفس نوع التوقيع مثل مساعد TFF (أسماء معلمات النموذج):
print(trainer_accepting_ids.next.type_signature)
print(manual_trainer_with_preprocessing.next.type_signature)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,federated_dataset={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>) (<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,selected_clients={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)