הצג באתר TensorFlow.org | הפעל בגוגל קולאב | צפה במקור ב-GitHub | הורד מחברת |
במדריך זה, נסביר עקרונות העיצוב מאחורי tff.aggregators
מודול ושיטות עבודה מומלצות ליישום צבירה אישית של ערכים מלקוחות לשרת.
דרישות מוקדמות. הדרכה זו מניחה שאתה כבר מכיר מושגים בסיסיים של Federated Core כגון מיקומים ( tff.SERVER
, tff.CLIENTS
), איך TFF מייצג חישובים ( tff.tf_computation
, tff.federated_computation
) וחתימות סוגם.
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
תקציר עיצוב
בשנת TFF, "צבירה" מתייחסת לתנועה של סט של ערכים על tff.CLIENTS
לייצר בשווי מצטבר מאותו הסוג על tff.SERVER
. כלומר, כל ערך לקוח בודד לא צריך להיות זמין. לדוגמה בלמידה מאוחדת, ממוצע עדכוני מודל לקוח מקבל עדכון מודל מצטבר שיחול על המודל הגלובלי בשרת.
בנוסף למפעילים להשיג את היעד הזה כגון tff.federated_sum
, TFF מספק tff.templates.AggregationProcess
(א תהליך מצבי ) אשר מסדיר את החתימה סוג עבור חישוב צבירה אז זה יכול להכליל לצורות מורכבות יותר מאשר סכום פשוטה.
הרכיבים העיקריים של tff.aggregators
מודול מוקמים מפעלי בריאת AggregationProcess
, אשר נועדו להיות אבני הבניין שימושי replacable בדרך של TFF בשני היבטים:
- חישובים עם פרמטרים. הצבירה מהווה אבן בניין עצמאית שיכול להיות פקוק לתוך מודולים TFF אחרים שנועדו לעבודה עם
tff.aggregators
כדי parameterize צבירת דרוש שלהם.
דוגמא:
learning_process = tff.learning.build_federated_averaging_process(
...,
model_update_aggregation_factory=tff.aggregators.MeanFactory())
- הרכב צבירה. ניתן להרכיב אבן בניין צבירה עם אבני בניין אחרות של צבירה כדי ליצור צבירה מורכבת יותר.
דוגמא:
secure_mean = tff.aggregators.MeanFactory(
value_sum_factory=tff.aggregators.SecureSumFactory(...))
שאר המדריך הזה מסביר כיצד משיגים שתי מטרות אלו.
תהליך צבירה
אנחנו ראשונים לסכם את tff.templates.AggregationProcess
, ופעלו עם דפוס המפעל ליצירה שלו.
tff.templates.AggregationProcess
הוא tff.templates.MeasuredProcess
עם חתימות מסוג שצוינו עבור צבירה. בפרט, initialize
ו next
הפונקציות יש חתימות מהסוג הבא:
-
( -> state_type@SERVER)
-
(<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)
המדינה (מסוג state_type
) יש להציב בשרת. next
הפונקציה לוקחת כארגומנט קלט מדינת ערך יסוכם (מסוג value_type
) ממוקמים בחלק לקוחות. *
דרכים חלופיות טענות קלט אחרים, משקולות למשל ממוצע משוקלל. הוא מחזיר אובייקט מצב מעודכן, הערך המצטבר מאותו סוג שהוצב בשרת וכמה מדידות.
שימו לב ששתי המדינה להעביר בין הוצאות להורג של next
הפונקציה, ואת המדידות דיווחו נועדו לדווח על כול מידע ותלוי ביצוע ספציפי של next
הפונקציה, עשוי להיות ריקים. עם זאת, יש לציין אותם במפורש כדי שחלקים אחרים של TFF יהיו בעלי חוזה ברור.
מודולים TFF אחרים, למשל את העדכונים מודל tff.learning
, צפויים להשתמש tff.templates.AggregationProcess
כדי parameterize כיצד ערכים הם מצטברים. עם זאת, מה בדיוק הם הערכים המצטברים ומהן חתימות הסוג שלהם, תלוי בפרטים אחרים של המודל המתאמן ואלגוריתם הלמידה המשמש לעשות זאת.
כדי להפוך צבירה עצמאית של היבטים האחרים של חישובים, אנו משתמשים הדפוס במפעל - אנו יוצרים את מתאים tff.templates.AggregationProcess
פעם חתימות הסוג הרלוונטיות של חפצים כדי לצבור זמינות, על ידי ההעלאה create
שיטה של המפעל. טיפול ישיר בתהליך הצבירה נחוץ אפוא רק עבור מחברי הספרייה, האחראים ליצירה זו.
מפעלי תהליך צבירה
ישנן שתי מחלקות בסיס מופשטות עבור צבירה לא משוקללת ומשוקללת. שלהם create
שיטה לוקח חתימות סוג של ערך תסוכם ומחזירה tff.templates.AggregationProcess
עבור צבירה של ערכים כאלה.
התהליך נוצר על ידי tff.aggregators.UnweightedAggregationFactory
לוקח שתי טענות קלט: (1) מדינה על שרת (2) ערך של סוג מסוים value_type
.
דוגמה ליישום הוא tff.aggregators.SumFactory
.
התהליך נוצר על ידי tff.aggregators.WeightedAggregationFactory
לוקח שלושה טיעונים קלט: (1) המדינה על השרת, (2) ערך של סוג מסוים value_type
ו (3) משקל מסוג weight_type
, כפי שצוין על ידי המשתמש של המפעל כאשר הפנייה שלה create
שיטה.
דוגמה ליישום הוא tff.aggregators.MeanFactory
אשר מחשב ממוצע משוקלל.
דפוס המפעל הוא איך אנחנו משיגים את המטרה הראשונה שצוינה לעיל; שהצבירה היא אבן בניין עצמאית. לדוגמה, כאשר משנים אילו משתני מודל ניתנים לאימון, צבירה מורכבת לא בהכרח צריכה להשתנות; המפעל המייצג הוא יופעל עם חתימה מסוג שונה כאשר נעשה שימוש בשיטה כגון tff.learning.build_federated_averaging_process
.
קומפוזיציות
נזכיר שתהליך צבירה כללי יכול להכיל (א) עיבוד מקדים של הערכים אצל לקוחות, (ב) תנועה של ערכים מלקוח לשרת, ו-(ג) עיבוד אחר של הערך המצטבר בשרת. המטרה השנייה כאמור לעיל, הרכב צבירה, הוא הבין שבתוך tff.aggregators
המודול ע"י הבניית יישום מפעלים צבירה כזה חלק (ב) ניתן להאציל למפעל צבירה אחר.
במקום להטמיע את כל ההיגיון הדרוש בתוך מחלקה יחידה של מפעל, ההטמעות ממוקדות כברירת מחדל בהיבט אחד הרלוונטי לצבירה. בעת הצורך, דפוס זה מאפשר לנו להחליף את אבני הבניין אחת בכל פעם.
דוגמא לכך היא המשוקלל tff.aggregators.MeanFactory
. היישום שלו מכפיל ערכים ומשקלים שסופקו אצל לקוחות, לאחר מכן מסכם את הערכים המשוקללים והמשקלים באופן עצמאי, ולאחר מכן מחלק את סכום הערכים המשוקללים בסכום המשקולות בשרת. במקום ליישם לסיכומים ידי שימוש ישיר tff.federated_sum
מפעיל, את הסיכום מואצל שני מקרים של tff.aggregators.SumFactory
.
מבנה כזה מאפשר את החלפת שני סיכומי ברירת המחדל במפעלים שונים, המממשים את הסכום בצורה שונה. לדוגמה, tff.aggregators.SecureSumFactory
, או יישום מותאם אישית של tff.aggregators.UnweightedAggregationFactory
. לעומת זאת, זמן, tff.aggregators.MeanFactory
יכול עצמו להיות צבירה פנימית של מפעל אחר כגון tff.aggregators.clipping_factory
, אם הערכים הם להיות מקוטעים לפני המיצוע.
עיין קודם הכוונון מומלץ מצבורים ללימוד הדרכה לשימושי receommended של מנגנון הרכב באמצעות מפעלים קיימים tff.aggregators
מודול.
שיטות עבודה מומלצות לפי דוגמה
אנחנו הולכים כדי להמחיש את tff.aggregators
מושג בפירוט על ידי יישום משימת דוגמא פשוט, ולהפוך אותו יותר ויותר כלליים. דרך נוספת ללמוד היא להסתכל על היישום של מפעלים קיימים.
import collections
import tensorflow as tf
import tensorflow_federated as tff
במקום סיכום value
, המשימה למשל היא לסכם value * 2.0
ולאחר מכן לחלק את הסכום על ידי 2.0
. תוצאת הצבירה היא ובכך השווה מתמטי סיכום ישירות value
, ועלולה להיחשב מורכב משלושה חלקים: (1) קנו מידה על לקוחות (2) סיכום פני לקוחות (3) unscaling על שרת.
בעקבות העיצוב שהוסבר לעיל, ההיגיון ייושם כפי תת מחלקה של tff.aggregators.UnweightedAggregationFactory
, יוצרת המתאימה tff.templates.AggregationProcess
כאשר קבלו value_type
כדי המצרפי:
יישום מינימלי
עבור המשימה לדוגמה, החישובים הדרושים הם תמיד זהים, כך שאין צורך בשימוש במצב. הוא ובכך לרוקן, ו מיוצג tff.federated_value((), tff.SERVER)
. כך גם לגבי מדידות, לעת עתה.
היישום המינימלי של המשימה הוא אפוא כדלקמן:
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value((), tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
scaled_value = tff.federated_map(
tff.tf_computation(lambda x: x * 2.0), value)
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x: x / 2.0), summed_value)
measurements = tff.federated_value((), tff.SERVER)
return tff.templates.MeasuredProcessOutput(
state=state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
ניתן לאמת אם הכל עובד כמצופה באמצעות הקוד הבא:
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result} (expected 8.0)')
Type signatures of the created aggregation process: - initialize: ( -> <>@SERVER) - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>) Aggregation result: 8.0 (expected 8.0)
אמינות ומדידות
נעשה שימוש נרחב ב-Statefulness ב-TFF כדי לייצג חישובים שצפויים להתבצע באופן איטרטיבי ולהשתנות עם כל איטרציה. לדוגמה, המצב של חישוב למידה מכיל את משקלי המודל הנלמד.
כדי להמחיש כיצד להשתמש במצב בחישוב צבירה, אנו משנים את המשימה לדוגמה. במקום הכפלת value
על ידי 2.0
, נכפיל את זה על ידי מדד איטרציה - מספר הפעמים צבירה כבר להורג.
לשם כך, אנו זקוקים לדרך לעקוב אחר מדד האיטרציה, אשר מושג באמצעות מושג המדינה. בשנתי ה initialize_fn
, במקום ליצור מדינה ריקה, אנו לאתחל את המדינה להיות אפס סקלר. ואז, מדינה יכולה לשמש next_fn
בשלושה שלבים: (1) תוספת של 1.0
, (2) שימוש להכפיל value
, ו (3) בתמורה כמדינת העדכון החדשה.
ברגע שזה נעשה, ייתכן לב: אבל בדיוק באותו קוד כאמור יכול לשמש כדי לאמת את כל העבודות כצפוי. איך אני יודע שמשהו באמת השתנה?
שאלה טובה! זה המקום שבו מושג המדידות הופך שימושי. באופן כללי, מדידות יכולות לדווח על כול ערך רלוונטי ביצוע יחיד של next
הפונקציה, אשר יכול לשמש לניטור. במקרה זה, זה יכול להיות summed_value
מהדוגמה הקודמת. כלומר, הערך שלפני שלב "ביטול קנה מידה", שאמור להיות תלוי במדד האיטרציה. שוב, זה לא בהכרח שימושי בפועל, אבל ממחיש את המנגנון הרלוונטי.
התשובה הממלכתית למשימה נראית כך:
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(
tff.tf_computation(lambda x: x + 1.0), state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
הערה כי state
שמגיע לתוך next_fn
כקלט מושם על השרת. על מנת להשתמש בו לקוחות, היא זקוקה ראשונה שתועבר, אשר מושגת באמצעות tff.federated_broadcast
מפעיל.
כדי לאמת את כל העבודות כצפוי, עכשיו אנחנו יכולים להסתכל דיווחו measurements
, שאמור להיות שונה עם כל סיבוב של ביצוע, גם אם בטווח באותה client_data
.
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 1)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 2)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 3)')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>) | Round #1 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 8.0 (expected 8.0 * 1) | Round #2 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 16.0 (expected 8.0 * 2) | Round #3 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 24.0 (expected 8.0 * 3)
סוגים מובנים
משקלי המודל של מודל מאומן בלמידה מאוחדת מיוצגים בדרך כלל כאוסף של טנסורים, ולא כטנזור בודד. בשנת TFF, זה מיוצג tff.StructType
ומפעלי צבירה שימושיים בדרך כלל צריכים להיות מסוגל לקבל את הסוגים המובנים.
עם זאת, בדוגמא לעיל, עבדנו רק עם tff.TensorType
אובייקט. אם ננסה להשתמש במפעל הקודם כדי ליצור תהליך צבירה עם tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))])
, אנחנו מקבלים שגיאה מוזרה בגלל TensorFlow ינסה להכפיל tf.Tensor
וכן list
.
הבעיה היא שבמקום הכפלת מבנה tensors בקבוע, אנחנו צריכים להכפיל כל מותח במבנה בקבוע. הפתרון השכיח לבעיה זו הוא להשתמש tf.nest
בתוך מודול של יצר tff.tf_computation
הים.
הגרסה של הקודם ExampleTaskFactory
תואם סוגים מובנים ובכך נראה כדלקמן:
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(add_one, state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(scale, (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
דוגמה זו מדגישה דפוס שעשוי להיות שימושי לביצוע בעת בניית קוד TFF. כאשר לא מתמודד עם פעולות פשוט מאוד, את הקוד הופך יותר קריא כאשר tff.tf_computation
הים כי ישמש לבנייה בלוקים בתוך tff.federated_computation
נוצר במקום נפרד. בפנים של tff.federated_computation
, אבני הבניין האלה מחוברים רק באמצעות מפעילי מהותי.
כדי לוודא שזה עובד כצפוי:
client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
[[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
f' Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>) Aggregation result: [[2. 3.], [6. 4. 0.]] Expected: [[2. 3.], [6. 4. 0.]]
צבירות פנימיות
השלב האחרון הוא אופציונלי לאפשר האצלה של הצבירה בפועל למפעלים אחרים, על מנת לאפשר הרכבה קלה של טכניקות צבירה שונות.
זו מושגת על ידי יצירת אופציונלי inner_factory
הטיעון של בנאי של שלנו ExampleTaskFactory
. אם לא צוין, tff.aggregators.SumFactory
משמש, המחיל את tff.federated_sum
המפעיל המשמש ישירות בסעיף הקודם.
כאשר create
נקרא, נוכל להתקשר ראשון create
של inner_factory
ליצור תהליך הצבירה הפנימי עם אותו value_type
.
מצב התהליך שלנו שמחזיר initialize_fn
הוא רכב משני חלקים: המדינה נוצרה על ידי "זה" תהליך, ומצב התהליך הפנימי עתה יצר.
יישום next_fn
השונה כי הצבירה בפועל לאצילה next
הפונקציה של התהליך הפנימי, וכן כיצד התוצר הסופי מורכב. המדינה היא שוב מורכבת ו "זה" מדינה "פנימית", ומדידות מורכבות בצורה דומה כמו OrderedDict
.
להלן יישום של דפוס כזה.
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def __init__(self, inner_factory=None):
if inner_factory is None:
inner_factory = tff.aggregators.SumFactory()
self._inner_factory = inner_factory
def create(self, value_type):
inner_process = self._inner_factory.create(value_type)
@tff.federated_computation()
def initialize_fn():
my_state = tff.federated_value(0.0, tff.SERVER)
inner_state = inner_process.initialize()
return tff.federated_zip((my_state, inner_state))
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
my_state, inner_state = state
my_new_state = tff.federated_map(add_one, my_state)
my_state_at_clients = tff.federated_broadcast(my_new_state)
scaled_value = tff.federated_map(scale, (value, my_state_at_clients))
# Delegation to an inner factory, returning values placed at SERVER.
inner_output = inner_process.next(inner_state, scaled_value)
unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))
new_state = tff.federated_zip((my_new_state, inner_output.state))
measurements = tff.federated_zip(
collections.OrderedDict(
scaled_value=inner_output.result,
example_task=inner_output.measurements))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
כאשר האצלה אל inner_process.next
הפונקציה, המבנה ובתמורה אנו מקבלים הוא tff.templates.MeasuredProcessOutput
, עם אותם השלושה שדות - state
, result
ו measurements
. בעת יצירת מבנה התשואה הכוללת של תהליך הצבירה המורכב, את state
ואת measurements
השדות צריכים להיות מורכבים בדרך כלל וחזרו יחד. לעומת זאת, result
תואמת השדה לערך נצבר ובמקום "זורם דרך" ההצטברות המורכבת.
state
האובייקט יש לראות פרט ליישום המפעל, ובכך הרכב יכול להיות של כול מבנה. עם זאת, measurements
מתאימות ערכים ידווחו למשתמש בשלב כלשהו. לכן, אנו ממליצים להשתמש OrderedDict
, עם מורכב שמות כאלה שזה יהיה ברור היכן רכב עושה דיווח מטרי מגיע.
שים לב גם את השימוש של tff.federated_zip
המפעיל. state
האובייקט contolled ידי התהליך שנוצר צריך להיות tff.FederatedType
. אם כן חזרנו במקום (this_state, inner_state)
ב initialize_fn
, חתימת סוג החזרה שלה תהיה tff.StructType
המכיל 2-tuple של tff.FederatedType
ים. השימוש tff.federated_zip
"מעליות" את tff.FederatedType
לרמה העליונה. זה משמש באופן דומה next_fn
בעת הכנת המדינה והמדידות תוחזרנה.
לבסוף, אנו יכולים לראות כיצד ניתן להשתמש בזה עם הצבירה הפנימית המוגדרת כברירת מחדל:
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: () | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: ()
... ועם צבירה פנימית אחרת. לדוגמה, ExampleTaskFactory
:
client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())]) | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])
סיכום
במדריך זה, הסברנו את שיטות העבודה המומלצות שיש לבצע כדי ליצור אבן בניין צבירה למטרות כלליות, המיוצגת כמפעל צבירה. הכלליות מגיעה דרך כוונת העיצוב בשתי דרכים:
- חישובים עם פרמטרים. הצבירה מהווה אבן בניין עצמאית שיכול להיות פקוק לתוך מודולים TFF אחרים שנועדו לעבודה עם
tff.aggregators
כדי parameterize צבירת דרוש שלהם, כגוןtff.learning.build_federated_averaging_process
. - הרכב צבירה. ניתן להרכיב אבן בניין צבירה עם אבני בניין אחרות של צבירה כדי ליצור צבירה מורכבת יותר.