יישום אגרגציות מותאמות אישית

הצג באתר TensorFlow.org הפעל בגוגל קולאב צפה במקור ב-GitHub הורד מחברת

במדריך זה, נסביר עקרונות העיצוב מאחורי tff.aggregators מודול ושיטות עבודה מומלצות ליישום צבירה אישית של ערכים מלקוחות לשרת.

דרישות מוקדמות. הדרכה זו מניחה שאתה כבר מכיר מושגים בסיסיים של Federated Core כגון מיקומים ( tff.SERVER , tff.CLIENTS ), איך TFF מייצג חישובים ( tff.tf_computation , tff.federated_computation ) וחתימות סוגם.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

תקציר עיצוב

בשנת TFF, "צבירה" מתייחסת לתנועה של סט של ערכים על tff.CLIENTS לייצר בשווי מצטבר מאותו הסוג על tff.SERVER . כלומר, כל ערך לקוח בודד לא צריך להיות זמין. לדוגמה בלמידה מאוחדת, ממוצע עדכוני מודל לקוח מקבל עדכון מודל מצטבר שיחול על המודל הגלובלי בשרת.

בנוסף למפעילים להשיג את היעד הזה כגון tff.federated_sum , TFF מספק tff.templates.AggregationProcessתהליך מצבי ) אשר מסדיר את החתימה סוג עבור חישוב צבירה אז זה יכול להכליל לצורות מורכבות יותר מאשר סכום פשוטה.

הרכיבים העיקריים של tff.aggregators מודול מוקמים מפעלי בריאת AggregationProcess , אשר נועדו להיות אבני הבניין שימושי replacable בדרך של TFF בשני היבטים:

  1. חישובים עם פרמטרים. הצבירה מהווה אבן בניין עצמאית שיכול להיות פקוק לתוך מודולים TFF אחרים שנועדו לעבודה עם tff.aggregators כדי parameterize צבירת דרוש שלהם.

דוגמא:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. הרכב צבירה. ניתן להרכיב אבן בניין צבירה עם אבני בניין אחרות של צבירה כדי ליצור צבירה מורכבת יותר.

דוגמא:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

שאר המדריך הזה מסביר כיצד משיגים שתי מטרות אלו.

תהליך צבירה

אנחנו ראשונים לסכם את tff.templates.AggregationProcess , ופעלו עם דפוס המפעל ליצירה שלו.

tff.templates.AggregationProcess הוא tff.templates.MeasuredProcess עם חתימות מסוג שצוינו עבור צבירה. בפרט, initialize ו next הפונקציות יש חתימות מהסוג הבא:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

המדינה (מסוג state_type ) יש להציב בשרת. next הפונקציה לוקחת כארגומנט קלט מדינת ערך יסוכם (מסוג value_type ) ממוקמים בחלק לקוחות. * דרכים חלופיות טענות קלט אחרים, משקולות למשל ממוצע משוקלל. הוא מחזיר אובייקט מצב מעודכן, הערך המצטבר מאותו סוג שהוצב בשרת וכמה מדידות.

שימו לב ששתי המדינה להעביר בין הוצאות להורג של next הפונקציה, ואת המדידות דיווחו נועדו לדווח על כול מידע ותלוי ביצוע ספציפי של next הפונקציה, עשוי להיות ריקים. עם זאת, יש לציין אותם במפורש כדי שחלקים אחרים של TFF יהיו בעלי חוזה ברור.

מודולים TFF אחרים, למשל את העדכונים מודל tff.learning , צפויים להשתמש tff.templates.AggregationProcess כדי parameterize כיצד ערכים הם מצטברים. עם זאת, מה בדיוק הם הערכים המצטברים ומהן חתימות הסוג שלהם, תלוי בפרטים אחרים של המודל המתאמן ואלגוריתם הלמידה המשמש לעשות זאת.

כדי להפוך צבירה עצמאית של היבטים האחרים של חישובים, אנו משתמשים הדפוס במפעל - אנו יוצרים את מתאים tff.templates.AggregationProcess פעם חתימות הסוג הרלוונטיות של חפצים כדי לצבור זמינות, על ידי ההעלאה create שיטה של המפעל. טיפול ישיר בתהליך הצבירה נחוץ אפוא רק עבור מחברי הספרייה, האחראים ליצירה זו.

מפעלי תהליך צבירה

ישנן שתי מחלקות בסיס מופשטות עבור צבירה לא משוקללת ומשוקללת. שלהם create שיטה לוקח חתימות סוג של ערך תסוכם ומחזירה tff.templates.AggregationProcess עבור צבירה של ערכים כאלה.

התהליך נוצר על ידי tff.aggregators.UnweightedAggregationFactory לוקח שתי טענות קלט: (1) מדינה על שרת (2) ערך של סוג מסוים value_type .

דוגמה ליישום הוא tff.aggregators.SumFactory .

התהליך נוצר על ידי tff.aggregators.WeightedAggregationFactory לוקח שלושה טיעונים קלט: (1) המדינה על השרת, (2) ערך של סוג מסוים value_type ו (3) משקל מסוג weight_type , כפי שצוין על ידי המשתמש של המפעל כאשר הפנייה שלה create שיטה.

דוגמה ליישום הוא tff.aggregators.MeanFactory אשר מחשב ממוצע משוקלל.

דפוס המפעל הוא איך אנחנו משיגים את המטרה הראשונה שצוינה לעיל; שהצבירה היא אבן בניין עצמאית. לדוגמה, כאשר משנים אילו משתני מודל ניתנים לאימון, צבירה מורכבת לא בהכרח צריכה להשתנות; המפעל המייצג הוא יופעל עם חתימה מסוג שונה כאשר נעשה שימוש בשיטה כגון tff.learning.build_federated_averaging_process .

קומפוזיציות

נזכיר שתהליך צבירה כללי יכול להכיל (א) עיבוד מקדים של הערכים אצל לקוחות, (ב) תנועה של ערכים מלקוח לשרת, ו-(ג) עיבוד אחר של הערך המצטבר בשרת. המטרה השנייה כאמור לעיל, הרכב צבירה, הוא הבין שבתוך tff.aggregators המודול ע"י הבניית יישום מפעלים צבירה כזה חלק (ב) ניתן להאציל למפעל צבירה אחר.

במקום להטמיע את כל ההיגיון הדרוש בתוך מחלקה יחידה של מפעל, ההטמעות ממוקדות כברירת מחדל בהיבט אחד הרלוונטי לצבירה. בעת הצורך, דפוס זה מאפשר לנו להחליף את אבני הבניין אחת בכל פעם.

דוגמא לכך היא המשוקלל tff.aggregators.MeanFactory . היישום שלו מכפיל ערכים ומשקלים שסופקו אצל לקוחות, לאחר מכן מסכם את הערכים המשוקללים והמשקלים באופן עצמאי, ולאחר מכן מחלק את סכום הערכים המשוקללים בסכום המשקולות בשרת. במקום ליישם לסיכומים ידי שימוש ישיר tff.federated_sum מפעיל, את הסיכום מואצל שני מקרים של tff.aggregators.SumFactory .

מבנה כזה מאפשר את החלפת שני סיכומי ברירת המחדל במפעלים שונים, המממשים את הסכום בצורה שונה. לדוגמה, tff.aggregators.SecureSumFactory , או יישום מותאם אישית של tff.aggregators.UnweightedAggregationFactory . לעומת זאת, זמן, tff.aggregators.MeanFactory יכול עצמו להיות צבירה פנימית של מפעל אחר כגון tff.aggregators.clipping_factory , אם הערכים הם להיות מקוטעים לפני המיצוע.

עיין קודם הכוונון מומלץ מצבורים ללימוד הדרכה לשימושי receommended של מנגנון הרכב באמצעות מפעלים קיימים tff.aggregators מודול.

שיטות עבודה מומלצות לפי דוגמה

אנחנו הולכים כדי להמחיש את tff.aggregators מושג בפירוט על ידי יישום משימת דוגמא פשוט, ולהפוך אותו יותר ויותר כלליים. דרך נוספת ללמוד היא להסתכל על היישום של מפעלים קיימים.

import collections
import tensorflow as tf
import tensorflow_federated as tff

במקום סיכום value , המשימה למשל היא לסכם value * 2.0 ולאחר מכן לחלק את הסכום על ידי 2.0 . תוצאת הצבירה היא ובכך השווה מתמטי סיכום ישירות value , ועלולה להיחשב מורכב משלושה חלקים: (1) קנו מידה על לקוחות (2) סיכום פני לקוחות (3) unscaling על שרת.

בעקבות העיצוב שהוסבר לעיל, ההיגיון ייושם כפי תת מחלקה של tff.aggregators.UnweightedAggregationFactory , יוצרת המתאימה tff.templates.AggregationProcess כאשר קבלו value_type כדי המצרפי:

יישום מינימלי

עבור המשימה לדוגמה, החישובים הדרושים הם תמיד זהים, כך שאין צורך בשימוש במצב. הוא ובכך לרוקן, ו מיוצג tff.federated_value((), tff.SERVER) . כך גם לגבי מדידות, לעת עתה.

היישום המינימלי של המשימה הוא אפוא כדלקמן:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

ניתן לאמת אם הכל עובד כמצופה באמצעות הקוד הבא:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

אמינות ומדידות

נעשה שימוש נרחב ב-Statefulness ב-TFF כדי לייצג חישובים שצפויים להתבצע באופן איטרטיבי ולהשתנות עם כל איטרציה. לדוגמה, המצב של חישוב למידה מכיל את משקלי המודל הנלמד.

כדי להמחיש כיצד להשתמש במצב בחישוב צבירה, אנו משנים את המשימה לדוגמה. במקום הכפלת value על ידי 2.0 , נכפיל את זה על ידי מדד איטרציה - מספר הפעמים צבירה כבר להורג.

לשם כך, אנו זקוקים לדרך לעקוב אחר מדד האיטרציה, אשר מושג באמצעות מושג המדינה. בשנתי ה initialize_fn , במקום ליצור מדינה ריקה, אנו לאתחל את המדינה להיות אפס סקלר. ואז, מדינה יכולה לשמש next_fn בשלושה שלבים: (1) תוספת של 1.0 , (2) שימוש להכפיל value , ו (3) בתמורה כמדינת העדכון החדשה.

ברגע שזה נעשה, ייתכן לב: אבל בדיוק באותו קוד כאמור יכול לשמש כדי לאמת את כל העבודות כצפוי. איך אני יודע שמשהו באמת השתנה?

שאלה טובה! זה המקום שבו מושג המדידות הופך שימושי. באופן כללי, מדידות יכולות לדווח על כול ערך רלוונטי ביצוע יחיד של next הפונקציה, אשר יכול לשמש לניטור. במקרה זה, זה יכול להיות summed_value מהדוגמה הקודמת. כלומר, הערך שלפני שלב "ביטול קנה מידה", שאמור להיות תלוי במדד האיטרציה. שוב, זה לא בהכרח שימושי בפועל, אבל ממחיש את המנגנון הרלוונטי.

התשובה הממלכתית למשימה נראית כך:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

הערה כי state שמגיע לתוך next_fn כקלט מושם על השרת. על מנת להשתמש בו לקוחות, היא זקוקה ראשונה שתועבר, אשר מושגת באמצעות tff.federated_broadcast מפעיל.

כדי לאמת את כל העבודות כצפוי, עכשיו אנחנו יכולים להסתכל דיווחו measurements , שאמור להיות שונה עם כל סיבוב של ביצוע, גם אם בטווח באותה client_data .

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

סוגים מובנים

משקלי המודל של מודל מאומן בלמידה מאוחדת מיוצגים בדרך כלל כאוסף של טנסורים, ולא כטנזור בודד. בשנת TFF, זה מיוצג tff.StructType ומפעלי צבירה שימושיים בדרך כלל צריכים להיות מסוגל לקבל את הסוגים המובנים.

עם זאת, בדוגמא לעיל, עבדנו רק עם tff.TensorType אובייקט. אם ננסה להשתמש במפעל הקודם כדי ליצור תהליך צבירה עם tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) , אנחנו מקבלים שגיאה מוזרה בגלל TensorFlow ינסה להכפיל tf.Tensor וכן list .

הבעיה היא שבמקום הכפלת מבנה tensors בקבוע, אנחנו צריכים להכפיל כל מותח במבנה בקבוע. הפתרון השכיח לבעיה זו הוא להשתמש tf.nest בתוך מודול של יצר tff.tf_computation הים.

הגרסה של הקודם ExampleTaskFactory תואם סוגים מובנים ובכך נראה כדלקמן:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

דוגמה זו מדגישה דפוס שעשוי להיות שימושי לביצוע בעת בניית קוד TFF. כאשר לא מתמודד עם פעולות פשוט מאוד, את הקוד הופך יותר קריא כאשר tff.tf_computation הים כי ישמש לבנייה בלוקים בתוך tff.federated_computation נוצר במקום נפרד. בפנים של tff.federated_computation , אבני הבניין האלה מחוברים רק באמצעות מפעילי מהותי.

כדי לוודא שזה עובד כצפוי:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

צבירות פנימיות

השלב האחרון הוא אופציונלי לאפשר האצלה של הצבירה בפועל למפעלים אחרים, על מנת לאפשר הרכבה קלה של טכניקות צבירה שונות.

זו מושגת על ידי יצירת אופציונלי inner_factory הטיעון של בנאי של שלנו ExampleTaskFactory . אם לא צוין, tff.aggregators.SumFactory משמש, המחיל את tff.federated_sum המפעיל המשמש ישירות בסעיף הקודם.

כאשר create נקרא, נוכל להתקשר ראשון create של inner_factory ליצור תהליך הצבירה הפנימי עם אותו value_type .

מצב התהליך שלנו שמחזיר initialize_fn הוא רכב משני חלקים: המדינה נוצרה על ידי "זה" תהליך, ומצב התהליך הפנימי עתה יצר.

יישום next_fn השונה כי הצבירה בפועל לאצילה next הפונקציה של התהליך הפנימי, וכן כיצד התוצר הסופי מורכב. המדינה היא שוב מורכבת ו "זה" מדינה "פנימית", ומדידות מורכבות בצורה דומה כמו OrderedDict .

להלן יישום של דפוס כזה.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

כאשר האצלה אל inner_process.next הפונקציה, המבנה ובתמורה אנו מקבלים הוא tff.templates.MeasuredProcessOutput , עם אותם השלושה שדות - state , result ו measurements . בעת יצירת מבנה התשואה הכוללת של תהליך הצבירה המורכב, את state ואת measurements השדות צריכים להיות מורכבים בדרך כלל וחזרו יחד. לעומת זאת, result תואמת השדה לערך נצבר ובמקום "זורם דרך" ההצטברות המורכבת.

state האובייקט יש לראות פרט ליישום המפעל, ובכך הרכב יכול להיות של כול מבנה. עם זאת, measurements מתאימות ערכים ידווחו למשתמש בשלב כלשהו. לכן, אנו ממליצים להשתמש OrderedDict , עם מורכב שמות כאלה שזה יהיה ברור היכן רכב עושה דיווח מטרי מגיע.

שים לב גם את השימוש של tff.federated_zip המפעיל. state האובייקט contolled ידי התהליך שנוצר צריך להיות tff.FederatedType . אם כן חזרנו במקום (this_state, inner_state) ב initialize_fn , חתימת סוג החזרה שלה תהיה tff.StructType המכיל 2-tuple של tff.FederatedType ים. השימוש tff.federated_zip "מעליות" את tff.FederatedType לרמה העליונה. זה משמש באופן דומה next_fn בעת הכנת המדינה והמדידות תוחזרנה.

לבסוף, אנו יכולים לראות כיצד ניתן להשתמש בזה עם הצבירה הפנימית המוגדרת כברירת מחדל:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... ועם צבירה פנימית אחרת. לדוגמה, ExampleTaskFactory :

client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

סיכום

במדריך זה, הסברנו את שיטות העבודה המומלצות שיש לבצע כדי ליצור אבן בניין צבירה למטרות כלליות, המיוצגת כמפעל צבירה. הכלליות מגיעה דרך כוונת העיצוב בשתי דרכים:

  1. חישובים עם פרמטרים. הצבירה מהווה אבן בניין עצמאית שיכול להיות פקוק לתוך מודולים TFF אחרים שנועדו לעבודה עם tff.aggregators כדי parameterize צבירת דרוש שלהם, כגון tff.learning.build_federated_averaging_process .
  2. הרכב צבירה. ניתן להרכיב אבן בניין צבירה עם אבני בניין אחרות של צבירה כדי ליצור צבירה מורכבת יותר.