عرض على TensorFlow.org | تشغيل في Google Colab | عرض المصدر على جيثب | تحميل دفتر |
في هذا البرنامج التعليمي، وشرح مبادئ التصميم وراء tff.aggregators
حدة وأفضل الممارسات لتنفيذ تجميع مخصص من القيم من عملاء الخادم.
المتطلبات الأساسية. يفترض هذا البرنامج التعليمي كنت معتادا على المفاهيم الأساسية للبالفعل الاتحادية الأساسية مثل مواضع ( tff.SERVER
، tff.CLIENTS
)، كيف TFF يمثل الحسابية ( tff.tf_computation
، tff.federated_computation
) والتوقيعات نوعها.
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
ملخص التصميم
في TFF، "تجميع" يشير إلى حركة مجموعة من القيم على tff.CLIENTS
لإنتاج تبلغ القيمة الإجمالية لنفس النوع على tff.SERVER
. وهذا يعني أن كل قيمة عميل فردية لا يلزم أن تكون متاحة. على سبيل المثال في التعلم الموحد ، يتم حساب متوسط تحديثات نموذج العميل للحصول على تحديث نموذج إجمالي لتطبيقه على النموذج العام على الخادم.
بالإضافة إلى مشغلي تحقيق هذا الهدف مثل tff.federated_sum
، يوفر TFF tff.templates.AggregationProcess
(أ عملية جليل ) الذي الطابع الرسمي على توقيع نوع لحساب التجميع بحيث يمكن تعميم إلى أشكال أكثر تعقيدا من مبلغ بسيط.
المكونات الرئيسية لل tff.aggregators
حدة من المصانع لإنشاء AggregationProcess
، والتي صممت لتكون اللبنات مفيدة وreplacable عموما TFF في جانبين:
- الحسابات ذات المعاملات. تجميع لبنة المستقل الذي يمكن توصيله إلى وحدات TFF أخرى مصممة للعمل مع
tff.aggregators
إلى بالحدود تجميع واللازمة.
مثال:
learning_process = tff.learning.build_federated_averaging_process(
...,
model_update_aggregation_factory=tff.aggregators.MeanFactory())
- تكوين التجميع. يمكن أن تتكون الكتلة البرمجية الإنشائية للتجميع من وحدات بناء التجميع الأخرى لإنشاء تجميعات مركبة أكثر تعقيدًا.
مثال:
secure_mean = tff.aggregators.MeanFactory(
value_sum_factory=tff.aggregators.SecureSumFactory(...))
يوضح الجزء المتبقي من هذا البرنامج التعليمي كيفية تحقيق هذين الهدفين.
عملية التجميع
نحن تلخيص أول tff.templates.AggregationProcess
، واتبع مع نمط مصنع لإنشائها.
و tff.templates.AggregationProcess
هو tff.templates.MeasuredProcess
مع التوقيعات النوع المحدد لتجميع. على وجه الخصوص، initialize
و next
وظائف لديها التواقيع نوع ما يلي:
-
( -> state_type@SERVER)
-
(<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)
الدولة (من نوع state_type
) يجب أن توضع في الخدمة. و next
يأخذ وظيفة وسيطة الإدخال الدولة والقيمة التي سيتم تجميعها (من نوع value_type
) وضعت في العملاء. و *
وسائل اختياري الحجج إدخال أخرى، على سبيل المثال الأوزان في المتوسط الموزون. تقوم بإرجاع كائن حالة محدث ، والقيمة المجمعة من نفس النوع الموضوعة على الخادم ، وبعض القياسات.
لاحظ أن كل من الدولة لتمريرها بين الإعدام و next
وظيفة، ويقصد القياسات ذكرت أن يقدم أي معلومات اعتمادا على تنفيذ محددة ل next
وظيفة، قد تكون فارغة. ومع ذلك ، يجب تحديدها بشكل صريح لأجزاء أخرى من TFF للحصول على عقد واضح لمتابعة.
على سبيل المثال التحديثات نموذج في وحدات TFF أخرى، tff.learning
، ومن المتوقع أن استخدام tff.templates.AggregationProcess
إلى بالحدود كيف يتم تجميع القيم. ومع ذلك ، ما هي القيم المجمعة بالضبط وما هي أنواع التوقيعات ، يعتمد على التفاصيل الأخرى للنموذج الذي يتم تدريبه وخوارزمية التعلم المستخدمة للقيام بذلك.
لجعل تجميع مستقلة عن جوانب أخرى من العمليات الحسابية، ونحن نستخدم نمط مصنع - نخلق المناسبة tff.templates.AggregationProcess
مرة واحدة تواقيع نوع ذات الصلة من الكائنات التي سيتم تجميعها متاحة، من خلال الاستناد إلى create
طريقة للمصنع. وبالتالي ، فإن المعالجة المباشرة لعملية التجميع مطلوبة فقط لمؤلفي المكتبات المسؤولين عن هذا الإنشاء.
مصانع عملية التجميع
هناك نوعان من فئات المصنع الأساسية المجردة للتجميع الموزون وغير الموزون. على create
أسلوب يأخذ نوع تواقيع القيمة التي سيتم تجميعها وإرجاع tff.templates.AggregationProcess
لتجميع هذه القيم.
عملية التي أنشأتها tff.aggregators.UnweightedAggregationFactory
يأخذ حجتين المدخلات: (1) دولة في الخادم و(2) قيمة من نوع محدد value_type
.
التنفيذ مثال على ذلك هو tff.aggregators.SumFactory
.
عملية التي أنشأتها tff.aggregators.WeightedAggregationFactory
تأخذ ثلاث حجج المدخلات: (1) دولة في الخادم، (2) قيمة من نوع محدد value_type
و (3) الوزن من نوع weight_type
، على النحو المحدد من قبل المستخدم المصنع عند استدعاء في create
الأسلوب.
التنفيذ مثال على ذلك هو tff.aggregators.MeanFactory
الذي يحسب المتوسط الموزون.
نمط المصنع هو كيف نحقق الهدف الأول المذكور أعلاه ؛ هذا التجميع هو لبنة بناء مستقلة. على سبيل المثال ، عند تغيير متغيرات النموذج التي يمكن تدريبها ، لا يحتاج التجميع المعقد بالضرورة إلى التغيير ؛ المصنع يمثل ذلك سيتم استدعاؤه مع نوع توقيع مختلف عند استخدامها بطريقة مثل tff.learning.build_federated_averaging_process
.
التراكيب
تذكر أن عملية التجميع العامة يمكن أن تشمل (أ) بعض المعالجة المسبقة للقيم عند العملاء ، (ب) حركة القيم من العميل إلى الخادم ، و (ج) بعض المعالجة اللاحقة للقيمة المجمعة على الخادم. الهدف الثاني هو مذكور أعلاه، تكوين التجميع، ويتحقق داخل tff.aggregators
الوحدة النمطية التي كتبها هيكلة تنفيذ مصانع التجميع بحيث الجزء (ب) يمكن تفويضه إلى مصنع التجميع آخر.
بدلاً من تنفيذ كل المنطق الضروري داخل فئة مصنع واحدة ، تركز عمليات التنفيذ افتراضيًا على جانب واحد ذي صلة بالتجميع. عند الحاجة ، يمكّننا هذا النمط من استبدال اللبنات الأساسية واحدة تلو الأخرى.
ومن الأمثلة على ذلك الوزني tff.aggregators.MeanFactory
. يضاعف تنفيذه القيم والأوزان المقدمة للعملاء ، ثم يجمع كل من القيم المرجحة والأوزان بشكل مستقل ، ثم يقسم مجموع القيم الموزونة على مجموع الأوزان على الخادم. بدلا من تنفيذ summations باستخدام مباشرة tff.federated_sum
المشغل، وتفويض الجمع إلى حالتين من tff.aggregators.SumFactory
.
مثل هذا الهيكل يجعل من الممكن استبدال الجمعيتين الافتراضيتين بمصانع مختلفة ، والتي تدرك المجموع بشكل مختلف. على سبيل المثال، tff.aggregators.SecureSumFactory
، أو تطبيق مخصص لل tff.aggregators.UnweightedAggregationFactory
. على العكس من ذلك، والوقت، tff.aggregators.MeanFactory
يمكن أن تكون هي نفسها تجميع الداخلي للمصنع آخر مثل tff.aggregators.clipping_factory
، إذا كانت القيم هي أن يثقب قبل المتوسط.
انظر السابق ضبط أوصى تجمعات لتعلم البرنامج التعليمي للاستخدامات receommended آلية تكوين باستخدام المصانع القائمة في tff.aggregators
حدة.
أفضل الممارسات بالقدوة
نحن نذهب لتوضيح tff.aggregators
المفاهيم في التفاصيل من خلال تنفيذ مهمة بسيطة سبيل المثال، وجعله تدريجيا أكثر عمومية. طريقة أخرى للتعلم هي النظر في تنفيذ المصانع القائمة.
import collections
import tensorflow as tf
import tensorflow_federated as tff
بدلا من تلخيص value
ومهمة المثال هو لتلخيص value * 2.0
ومن ثم تقسيم مبلغ من 2.0
. والنتيجة هي تجميع بالتالي أي ما يعادل رياضيا لتلخيص مباشرة value
، ويمكن وصفه أنه يتكون من ثلاثة أجزاء: (1) التوسع في العملاء (2) تلخيص عبر عملاء (3) unscaling في الخادم.
بعد تصميم أوضح أعلاه، سيتم تنفيذ المنطق، فئة فرعية من tff.aggregators.UnweightedAggregationFactory
، مما يخلق مناسبا tff.templates.AggregationProcess
عندما تعطى value_type
إلى مجموع المباراتين:
الحد الأدنى من التنفيذ
بالنسبة لمهمة المثال ، تكون الحسابات الضرورية هي نفسها دائمًا ، لذلك ليست هناك حاجة لاستخدام الحالة. وبالتالي تفريغ، ومثلت أنها tff.federated_value((), tff.SERVER)
. نفس الشيء ينطبق على القياسات ، في الوقت الحالي.
وبالتالي يكون الحد الأدنى من تنفيذ المهمة كما يلي:
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value((), tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
scaled_value = tff.federated_map(
tff.tf_computation(lambda x: x * 2.0), value)
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x: x / 2.0), summed_value)
measurements = tff.federated_value((), tff.SERVER)
return tff.templates.MeasuredProcessOutput(
state=state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
يمكن التحقق مما إذا كان كل شيء يعمل كما هو متوقع باستخدام الكود التالي:
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result} (expected 8.0)')
Type signatures of the created aggregation process: - initialize: ( -> <>@SERVER) - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>) Aggregation result: 8.0 (expected 8.0)
الدولة والقياسات
يتم استخدام الحالة على نطاق واسع في TFF لتمثيل الحسابات التي يُتوقع تنفيذها بشكل تكراري وتغييرها مع كل تكرار. على سبيل المثال ، تحتوي حالة حساب التعلم على أوزان النموذج الذي يتم تعلمه.
لتوضيح كيفية استخدام الحالة في حساب التجميع ، نقوم بتعديل مهمة المثال. بدلا من مضاعفة value
من 2.0
، ونحن اضربها تلاه مؤشر التكرار - عدد المرات التي تم تنفيذها في التجميع.
للقيام بذلك ، نحتاج إلى طريقة لتتبع مؤشر التكرار ، والذي يتحقق من خلال مفهوم الحالة. في initialize_fn
، بدلا من إنشاء دولة فارغة، ونحن تهيئة الدولة لتكون صفر العددية. ثم، دولة يمكن أن تستخدم في next_fn
في ثلاث خطوات: (1) زيادة من 1.0
، (2) الاستخدام لمضاعفة value
، و (3) عودة كدولة حديثة جديدة.
حالما يتم ذلك، قد لاحظ: ولكن بالضبط نفس الرمز على النحو الوارد أعلاه يمكن استخدامها للتحقق من جميع الأعمال كما هو متوقع. كيف أعرف أن شيئًا ما قد تغير بالفعل؟
سؤال جيد! هذا هو المكان الذي يصبح فيه مفهوم القياسات مفيدًا. بشكل عام، يمكن أن القياسات الإبلاغ عن أي قيمة ذات الصلة لتنفيذ واحد من next
وظيفة، والتي يمكن استخدامها لرصد. في هذه الحالة، يمكن أن يكون summed_value
من المثال السابق. أي القيمة قبل خطوة "عدم القياس" ، والتي يجب أن تعتمد على فهرس التكرار. مرة أخرى ، هذا ليس بالضرورة مفيدًا في الممارسة ، لكنه يوضح الآلية ذات الصلة.
وهكذا تبدو الإجابة الحكيمة على المهمة كما يلي:
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(
tff.tf_computation(lambda x: x + 1.0), state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
علما بأن state
التي تأتي في next_fn
كمدخل يوضع في الخدمة. من أجل استخدامها في عملاء، فإنه يجب أولا أن أجله، والذي يتحقق باستخدام tff.federated_broadcast
المشغل.
للتحقق من جميع الأعمال كما هو متوقع، يمكننا الآن أن ننظر إلى ذكرت measurements
، والتي ينبغي أن تكون مختلفة مع كل جولة من التنفيذ، حتى لو المدى مع نفسه client_data
.
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 1)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 2)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 3)')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>) | Round #1 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 8.0 (expected 8.0 * 1) | Round #2 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 16.0 (expected 8.0 * 2) | Round #3 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 24.0 (expected 8.0 * 3)
الأنواع المهيكلة
عادةً ما يتم تمثيل أوزان النموذج لنموذج تم تدريبه في التعلم الفيدرالي كمجموعة من الموترات ، بدلاً من موتر واحد. في TFF، وهذا ما يمثل tff.StructType
والمصانع تجميع مفيدة عموما تحتاج إلى أن تكون قادرة على قبول أنواع منظم.
ومع ذلك، في الأمثلة المذكورة أعلاه، إننا نعمل فقط مع tff.TensorType
الكائن. لو كنا في محاولة لاستخدام المصنع السابق لإنشاء عملية التجميع مع tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))])
، وحصلنا على خطأ غريب ل سوف TensorFlow محاولة تتضاعف tf.Tensor
و list
.
والمشكلة هي أنه بدلا من ضرب هيكل التنسورات بواسطة ثابت، ونحن بحاجة الى مضاعفة كل موتر في بنية بواسطة ثابت. الحل المعتاد لهذه المشكلة هو استخدام tf.nest
حدة داخل خلق tff.tf_computation
الصورة.
إصدار السابق ExampleTaskFactory
متوافقة مع أنواع منظم مما يبدو على النحو التالي:
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(add_one, state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(scale, (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
يسلط هذا المثال الضوء على نمط قد يكون من المفيد اتباعه عند هيكلة كود TFF. عندما لا تتعامل مع عمليات بسيطة جدا، رمز يصبح أكثر مقروءا عندما tff.tf_computation
الصورة التي سيتم استخدامها في بناء كتل داخل tff.federated_computation
تم إنشاؤها في مكان منفصل. داخل tff.federated_computation
، هذه اللبنات ترتبط فقط باستخدام مشغلي الجوهرية.
للتحقق من أنها تعمل كما هو متوقع:
client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
[[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
f' Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>) Aggregation result: [[2. 3.], [6. 4. 0.]] Expected: [[2. 3.], [6. 4. 0.]]
التجمعات الداخلية
تتمثل الخطوة الأخيرة في تمكين تفويض التجميع الفعلي للمصانع الأخرى بشكل اختياري ، من أجل السماح بالتركيب السهل لتقنيات التجميع المختلفة.
ويتحقق ذلك عن طريق إنشاء اختياري inner_factory
حجة في منشئ لدينا ExampleTaskFactory
. إذا لم يكن محددا، tff.aggregators.SumFactory
يستخدم، وهو ما ينطبق على tff.federated_sum
المشغل استخدامها مباشرة في القسم السابق.
عندما create
ما يسمى، يمكننا أولا استدعاء create
من inner_factory
لإنشاء عملية التجميع الداخلية مع نفس value_type
.
حالة عمليتنا إرجاعها بواسطة initialize_fn
هي تركيبة من جزأين: الدولة التي أنشأتها "هذا" العملية، وحالة عملية الداخلية بإنشائها.
تنفيذ next_fn
يتم تفويض يختلف في أن التجميع الفعلي لل next
ظيفة عملية داخلية، وكيف يتكون الناتج النهائي. وتتكون الدولة من جديد من "هذا" والدولة "الداخلية"، وتتكون القياسات بطريقة مماثلة بوصفها OrderedDict
.
ما يلي هو تنفيذ مثل هذا النمط.
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def __init__(self, inner_factory=None):
if inner_factory is None:
inner_factory = tff.aggregators.SumFactory()
self._inner_factory = inner_factory
def create(self, value_type):
inner_process = self._inner_factory.create(value_type)
@tff.federated_computation()
def initialize_fn():
my_state = tff.federated_value(0.0, tff.SERVER)
inner_state = inner_process.initialize()
return tff.federated_zip((my_state, inner_state))
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
my_state, inner_state = state
my_new_state = tff.federated_map(add_one, my_state)
my_state_at_clients = tff.federated_broadcast(my_new_state)
scaled_value = tff.federated_map(scale, (value, my_state_at_clients))
# Delegation to an inner factory, returning values placed at SERVER.
inner_output = inner_process.next(inner_state, scaled_value)
unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))
new_state = tff.federated_zip((my_new_state, inner_output.state))
measurements = tff.federated_zip(
collections.OrderedDict(
scaled_value=inner_output.result,
example_task=inner_output.measurements))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
عندما يفوض إلى inner_process.next
وظيفة، وهيكل عودة نحصل هو tff.templates.MeasuredProcessOutput
، مع نفس الحقول الثلاثة - state
، result
و measurements
. عند إنشاء بنية العودة الشامل لعملية تجميع تتكون، و state
و measurements
الحقول يجب أن تتكون عموما وعاد معا. في المقابل، فإن result
يتوافق الحقل إلى القيمة التي تجمع وبدلا من ذلك "تدفق من خلال" التجميع يتكون.
و state
ينبغي أن ينظر إلى وجوه باعتبارها التفاصيل تنفيذ المصنع، وبالتالي تكوين يمكن أن يكون في أي هيكل. ومع ذلك، measurements
تتوافق مع القيم ورود أنباء عن أن المستخدم في مرحلة ما. ولذلك، فإننا ننصح استخدام OrderedDict
، مع تتكون تسمية بحيث يكون واضحا حيث في تكوين يفعل ذكرت متري يأتي من.
لاحظ أيضا استخدام tff.federated_zip
المشغل. و state
يجب أن يكون الكائن كان يسيطر بواسطة عملية خلق tff.FederatedType
. لو كان لدينا بدلا من ذلك عاد (this_state, inner_state)
في initialize_fn
، فإن توقيع نوع عودتها تكون tff.StructType
تحتوي على 2-الصفوف (tuple) من tff.FederatedType
الصورة. استخدام tff.federated_zip
"المصاعد" على tff.FederatedType
إلى مستوى أعلى. ويستخدم هذا بالمثل في next_fn
عند إعداد الدولة والقياسات يتم إرجاعها.
أخيرًا ، يمكننا أن نرى كيف يمكن استخدام ذلك مع التجميع الداخلي الافتراضي:
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: () | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: ()
... وبتجميع داخلي مختلف. على سبيل المثال، ExampleTaskFactory
:
client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())]) | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])
ملخص
في هذا البرنامج التعليمي ، أوضحنا أفضل الممارسات التي يجب اتباعها من أجل إنشاء كتلة بناء تجميع للأغراض العامة ، ممثلة كمصنع تجميع. تأتي العمومية من خلال نية التصميم بطريقتين:
- الحسابات ذات المعاملات. تجميع لبنة المستقل الذي يمكن توصيله إلى وحدات TFF أخرى مصممة للعمل مع
tff.aggregators
إلى بالحدود تجميع واللازمة، مثلtff.learning.build_federated_averaging_process
. - تكوين التجميع. يمكن أن تتكون الكتلة البرمجية الإنشائية للتجميع من وحدات بناء التجميع الأخرى لإنشاء تجميعات مركبة أكثر تعقيدًا.