การนำ Custom Agregations ไปใช้

ดูบน TensorFlow.org ทำงานใน Google Colab ดูแหล่งที่มาบน GitHub ดาวน์โหลดโน๊ตบุ๊ค

ในการกวดวิชานี้เราจะอธิบายหลักการออกแบบที่อยู่เบื้องหลัง tff.aggregators โมดูลและการปฏิบัติที่ดีที่สุดสำหรับการดำเนินการรวมที่กำหนดเองของค่าจากลูกค้าไปยังเซิร์ฟเวอร์

ข้อกำหนดเบื้องต้น กวดวิชานี้จะถือว่าคุณมีอยู่แล้วคุ้นเคยกับแนวคิดพื้นฐานของ สหพันธ์หลัก เช่นตำแหน่ง ( tff.SERVER , tff.CLIENTS ) วิธีฉิบหายแสดงให้เห็นถึงการคำนวณ ( tff.tf_computation , tff.federated_computation ) และลายเซ็นของพวกเขาชนิด

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

สรุปการออกแบบ

ในฉิบหาย "รวม" หมายถึงการเคลื่อนไหวของชุดของค่าในที่ tff.CLIENTS ในการผลิตมูลค่ารวมของชนิดเดียวกันใน tff.SERVER นั่นคือ ค่าลูกค้าแต่ละรายไม่จำเป็นต้องมี ตัวอย่างเช่น ในการเรียนรู้แบบสหพันธรัฐ การอัปเดตโมเดลไคลเอ็นต์จะถูกเฉลี่ยเพื่อรับการอัปเดตโมเดลรวมเพื่อนำไปใช้กับโมเดลส่วนกลางบนเซิร์ฟเวอร์

นอกจากนี้ผู้ประกอบการที่จะบรรลุเป้าหมายนี้เช่น tff.federated_sum , ฉิบหายให้ tff.templates.AggregationProcess (เป็น กระบวนการ stateful ) ซึ่ง formalizes ลายเซ็นชนิดสำหรับการคำนวณรวมเพื่อที่จะสามารถพูดคุยกับรูปแบบที่ซับซ้อนมากขึ้นกว่าผลรวมที่เรียบง่าย

ส่วนประกอบหลักของ tff.aggregators โมดูลโรงงานสำหรับการสร้าง AggregationProcess ซึ่งถูกออกแบบมาให้โดยทั่วไปที่มีประโยชน์และ replacable สร้างบล็อคของฉิบหายในสองด้าน:

  1. การคำนวณแบบกำหนดพารามิเตอร์ การรวมเป็นกลุ่มอาคารอิสระที่สามารถเสียบเข้ากับโมดูลอื่น ๆ ฉิบหายออกแบบมาเพื่อทำงานร่วมกับ tff.aggregators เพื่อ parameterize รวมที่จำเป็นของพวกเขา

ตัวอย่าง:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. องค์ประกอบการรวม เอกสารสำเร็จรูปการรวมสามารถประกอบกับบล็อคส่วนประกอบการรวมอื่นๆ เพื่อสร้างการรวมแบบผสมที่ซับซ้อนมากขึ้น

ตัวอย่าง:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

ส่วนที่เหลือของบทช่วยสอนนี้จะอธิบายว่าทั้งสองบรรลุเป้าหมายได้อย่างไร

กระบวนการรวบรวม

ครั้งแรกที่เราสรุป tff.templates.AggregationProcess และตามด้วยรูปแบบโรงงานสำหรับการสร้าง

tff.templates.AggregationProcess เป็น tff.templates.MeasuredProcess กับประเภทลายเซ็นที่ระบุไว้สำหรับการรวมตัว โดยเฉพาะอย่างยิ่ง initialize และ next ฟังก์ชั่นที่มีลายเซ็นประเภทต่อไปนี้:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

รัฐ (ประเภท state_type ) ต้องถูกวางไว้ที่เซิร์ฟเวอร์ next ฟังก์ชั่นใช้เวลาเป็นอาร์กิวเมนต์การป้อนข้อมูลของรัฐและค่าที่จะถูกรวม (ประเภท value_type ) อยู่ที่ลูกค้า * หมายถึงตัวเลือกข้อโต้แย้งป้อนข้อมูลอื่น ๆ สำหรับน้ำหนักเช่นในค่าเฉลี่ยถ่วงน้ำหนัก ส่งคืนอ็อบเจ็กต์สถานะที่อัปเดต ค่ารวมของประเภทเดียวกันที่วางไว้ที่เซิร์ฟเวอร์ และการวัดบางส่วน

โปรดทราบว่าทั้งของรัฐที่จะส่งผ่านระหว่างการประหารชีวิตของ next ฟังก์ชั่นและการวัดการรายงานวัตถุประสงค์เพื่อรายงานข้อมูลใด ๆ ขึ้นอยู่กับการดำเนินการของการ next ฟังก์ชั่นอาจจะว่างเปล่า อย่างไรก็ตาม ต้องระบุให้ชัดเจนเพื่อให้ส่วนอื่นๆ ของ TFF มีสัญญาที่ชัดเจนให้ปฏิบัติตาม

โมดูลฉิบหายอื่น ๆ เช่นการปรับปรุงรูปแบบใน tff.learning ที่คาดว่าจะใช้ tff.templates.AggregationProcess เพื่อ parameterize วิธีค่ามีการรวบรวม อย่างไรก็ตาม ค่าที่รวมกันคืออะไรและประเภทของลายเซ็นคืออะไร ขึ้นอยู่กับรายละเอียดอื่นๆ ของโมเดลที่กำลังฝึกและอัลกอริทึมการเรียนรู้ที่ใช้ทำ

เพื่อให้เป็นอิสระจากด้านอื่น ๆ ของการคำนวณรวมเราจะใช้รูปแบบโรงงาน - เราสร้างที่เหมาะสม tff.templates.AggregationProcess ครั้งลายเซ็นประเภทที่เกี่ยวข้องของวัตถุที่จะรวมที่มีอยู่โดยอัญเชิญ create วิธีการของโรงงาน การจัดการโดยตรงของกระบวนการรวมจึงจำเป็นสำหรับผู้เขียนห้องสมุดเท่านั้นซึ่งมีหน้าที่รับผิดชอบในการสร้างนี้

โรงงานแปรรูป

มีคลาสแฟกทอรีพื้นฐานที่เป็นนามธรรมสองคลาสสำหรับการรวมแบบไม่ถ่วงน้ำหนักและการรวมแบบถ่วงน้ำหนัก พวกเขา create วิธีการใช้เวลาประเภทลายเซ็นของมูลค่าที่จะรวบรวมและส่งกลับ tff.templates.AggregationProcess สำหรับการรวมตัวของค่าดังกล่าว

กระบวนการที่สร้างขึ้นโดย tff.aggregators.UnweightedAggregationFactory ใช้อาร์กิวเมนต์ใส่สอง (1) รัฐที่เซิร์ฟเวอร์และ (2) ค่าของชนิดที่ระบุ value_type

การใช้ตัวอย่างเป็น tff.aggregators.SumFactory

กระบวนการที่สร้างขึ้นโดย tff.aggregators.WeightedAggregationFactory ใช้เวลาสามข้อโต้แย้งการป้อนข้อมูล: (1) รัฐที่เซิร์ฟเวอร์ (2) ค่าของชนิดที่ระบุ value_type และ (3) น้ำหนักของประเภท weight_type ตามที่ระบุโดยผู้ใช้ของโรงงานเมื่อกล่าวอ้างของมัน create วิธีการ

การใช้ตัวอย่างเป็น tff.aggregators.MeanFactory ซึ่งคำนวณค่าเฉลี่ยถ่วงน้ำหนัก

รูปแบบโรงงานเป็นวิธีที่เราบรรลุเป้าหมายแรกที่ระบุไว้ข้างต้น การรวมกลุ่มนั้นเป็นหน่วยการสร้างที่เป็นอิสระ ตัวอย่างเช่น เมื่อเปลี่ยนตัวแปรรุ่นที่สามารถฝึกได้ การรวมที่ซับซ้อนไม่จำเป็นต้องเปลี่ยน โรงงานที่เป็นตัวแทนของมันจะถูกเรียกด้วยลายเซ็นของประเภทที่แตกต่างกันเมื่อนำมาใช้โดยวิธีการเช่น tff.learning.build_federated_averaging_process

องค์ประกอบ

โปรดจำไว้ว่ากระบวนการรวมทั่วไปสามารถสรุป (a) การประมวลผลล่วงหน้าของค่าที่ไคลเอนต์ (b) การเคลื่อนย้ายค่าจากไคลเอนต์ไปยังเซิร์ฟเวอร์ และ (c) การประมวลผลภายหลังของค่ารวมบางส่วนที่เซิร์ฟเวอร์ เป้าหมายที่สองระบุไว้ข้างต้นองค์ประกอบการรวมเป็นที่ตระหนักภายใน tff.aggregators โมดูลโดยการจัดโครงสร้างการดำเนินงานของโรงงานรวมดังกล่าวว่าเป็นส่วนหนึ่ง (ข) สามารถมอบหมายให้โรงงานรวมอีก

แทนที่จะใช้ตรรกะที่จำเป็นทั้งหมดภายในคลาสโรงงานเดียว โดยค่าเริ่มต้นการนำไปใช้จะเน้นที่ด้านเดียวที่เกี่ยวข้องกับการรวม เมื่อจำเป็น รูปแบบนี้จะทำให้เราสามารถแทนที่บล็อคการสร้างทีละครั้ง

ตัวอย่างคือถ่วงน้ำหนัก tff.aggregators.MeanFactory การใช้งานจะคูณค่าและน้ำหนักที่ให้ไว้กับไคลเอ็นต์ จากนั้นจึงรวมทั้งค่าถ่วงน้ำหนักและน้ำหนักแยกกัน จากนั้นจึงแบ่งผลรวมของค่าที่ถ่วงน้ำหนักด้วยผลรวมของน้ำหนักที่เซิร์ฟเวอร์ แต่ในการดำเนินการ summations โดยตรงโดยใช้ tff.federated_sum ผู้ประกอบการรวมที่มีการมอบหมายให้ทั้งสองกรณีของ tff.aggregators.SumFactory

โครงสร้างดังกล่าวทำให้สามารถแทนที่ผลรวมเริ่มต้นสองค่าโดยโรงงานต่างๆ กัน ซึ่งตระหนักถึงผลรวมที่แตกต่างกัน ยกตัวอย่างเช่น tff.aggregators.SecureSumFactory หรือดำเนินการเองของ tff.aggregators.UnweightedAggregationFactory ตรงกันข้ามเวลา tff.aggregators.MeanFactory สามารถตัวเองจะมีการรวมภายในของโรงงานอีกเช่น tff.aggregators.clipping_factory ถ้าค่าที่จะได้รับการตัดก่อนเฉลี่ย

ดูที่ก่อนหน้านี้ ปรับแต่งรวมตัวแนะนำสำหรับการเรียนรู้ การสอนสำหรับการใช้งานของกลไก receommended องค์ประกอบโดยใช้โรงงานที่มีอยู่ใน tff.aggregators โมดูล

แนวปฏิบัติที่ดีที่สุดตามตัวอย่าง

เราจะแสดงให้เห็นถึง tff.aggregators แนวคิดในรายละเอียดโดยการใช้งานตัวอย่างง่ายๆและทำให้มันมีความก้าวหน้ามากขึ้นทั่วไป อีกวิธีในการเรียนรู้คือการดูการดำเนินงานของโรงงานที่มีอยู่

import collections
import tensorflow as tf
import tensorflow_federated as tff

แทนที่จะข้อสรุป value งานตัวอย่างคือการสรุป value * 2.0 แล้วหารผลรวมจาก 2.0 ผลการรวมจึงเป็นทางคณิตศาสตร์เทียบเท่ากับข้อสรุปโดยตรง value และสามารถจะคิดว่าเป็นประกอบด้วยสามส่วนคือ (1) การปรับขนาดที่ลูกค้า (2) ข้อสรุปทั่วลูกค้า (3) unscaling ที่เซิร์ฟเวอร์

ต่อไปนี้การออกแบบที่อธิบายข้างต้นตรรกะจะถูกนำมาใช้เป็น subclass ของ tff.aggregators.UnweightedAggregationFactory ซึ่งจะสร้างที่เหมาะสม tff.templates.AggregationProcess เมื่อได้รับ value_type จะรวม:

การใช้งานน้อยที่สุด

สำหรับงานตัวอย่าง การคำนวณที่จำเป็นจะเหมือนกันเสมอ ดังนั้นจึงไม่จำเป็นต้องใช้สถานะ มันจึงว่างเปล่าและแสดงเป็น tff.federated_value((), tff.SERVER) เช่นเดียวกับการวัดในขณะนี้

การใช้งานขั้นต่ำของงานจึงเป็นดังนี้:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

ไม่ว่าทุกอย่างจะทำงานตามที่คาดไว้หรือไม่ สามารถตรวจสอบได้ด้วยรหัสต่อไปนี้:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

สภาพและการวัด

สภาพถูกใช้อย่างกว้างๆ ใน ​​TFF เพื่อแสดงถึงการคำนวณที่คาดว่าจะดำเนินการซ้ำๆ และเปลี่ยนแปลงไปกับการวนซ้ำแต่ละครั้ง ตัวอย่างเช่น สถานะของการคำนวณการเรียนรู้มีน้ำหนักของแบบจำลองที่กำลังเรียนรู้

เพื่อแสดงวิธีใช้สถานะในการคำนวณการรวม เราแก้ไขงานตัวอย่าง แทนการคูณ value โดย 2.0 เราคูณด้วยดัชนีซ้ำ - จำนวนครั้งที่สรุปรวมได้รับการดำเนินการ

ในการทำเช่นนั้น เราจำเป็นต้องมีวิธีในการติดตามดัชนีการวนซ้ำ ซึ่งทำได้โดยผ่านแนวคิดเรื่องสถานะ ใน initialize_fn แทนการสร้างสถานะว่างเปล่าเราเริ่มต้นของรัฐที่จะเป็นศูนย์เกลา จากนั้นรัฐสามารถนำมาใช้ใน next_fn ในสามขั้นตอนคือ (1) เพิ่มขึ้นจาก 1.0 (2) ที่ใช้ในการคูณ value และ (3) การกลับมาเป็นรัฐที่มีการปรับปรุงใหม่

ครั้งนี้จะทำคุณอาจทราบ: แต่ตรงรหัสเดียวกันกับข้างต้นสามารถนำมาใช้ในการตรวจสอบผลงานทั้งหมดตามที่คาดไว้ ฉันจะรู้ได้อย่างไรว่ามีบางอย่างเปลี่ยนแปลงไปจริงๆ

คำถามที่ดี! นี่คือจุดที่แนวคิดของการวัดมีประโยชน์ โดยทั่วไปการวัดสามารถรายงานค่าใด ๆ ที่เกี่ยวข้องกับการดำเนินการเดียวของ next ฟังก์ชั่นซึ่งสามารถนำมาใช้สำหรับการตรวจสอบ ในกรณีนี้จะสามารถ summed_value จากตัวอย่างก่อนหน้านี้ นั่นคือค่าก่อนขั้นตอน "unscaling" ซึ่งควรขึ้นอยู่กับดัชนีการวนซ้ำ อีกครั้ง ไม่จำเป็นว่ามีประโยชน์ในทางปฏิบัติ แต่แสดงให้เห็นกลไกที่เกี่ยวข้อง

คำตอบของ stateful สำหรับงานจึงมีลักษณะดังนี้:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

โปรดทราบว่า state ที่เข้ามาใน next_fn เป็น input ถูกวางไว้ที่เซิร์ฟเวอร์ เพื่อที่จะใช้มันที่ลูกค้ามันเป็นครั้งแรกที่จะต้องมีการสื่อสารซึ่งประสบความสำเร็จโดยใช้ tff.federated_broadcast ผู้ประกอบการ

เพื่อตรวจสอบผลงานทั้งหมดเป็นไปตามคาดขณะนี้เราสามารถดูรายงาน measurements ซึ่งควรจะแตกต่างกันด้วยในแต่ละรอบของการดำเนินการแม้ว่าการทำงานแบบเดียวกับ client_data

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

โครงสร้างประเภท

น้ำหนักแบบจำลองของแบบจำลองที่ได้รับการฝึกฝนในการเรียนรู้แบบสหพันธรัฐมักจะแสดงเป็นชุดของเมตริกซ์ แทนที่จะเป็นเมตริกซ์เดียว ในฉิบหายนี้จะแสดงเป็น tff.StructType และโรงงานรวมที่มีประโยชน์โดยทั่วไปจะต้องมีความสามารถที่จะยอมรับประเภทโครงสร้าง

อย่างไรก็ตามในตัวอย่างข้างต้นเราจะทำงานร่วมกับ tff.TensorType วัตถุ ถ้าเราพยายามที่จะใช้โรงงานก่อนหน้านี้ในการสร้างกระบวนการการรวมตัวกับ tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) เราได้รับข้อผิดพลาดแปลกเพราะ TensorFlow จะพยายามที่จะคูณ tf.Tensor และ list

ปัญหาคือว่าแทนที่จะคูณโครงสร้างของเทนเซอร์โดยคงที่เราต้องคูณแต่ละเมตริกซ์ในโครงสร้างโดยคงที่ วิธีการแก้ปัญหาตามปกติเพื่อแก้ไขปัญหานี้คือการใช้ tf.nest ภายในโมดูลที่สร้าง tff.tf_computation s

รุ่นก่อนหน้า ExampleTaskFactory เข้ากันได้กับประเภทโครงสร้างจึงมีลักษณะดังต่อไปนี้:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

ตัวอย่างนี้เน้นรูปแบบที่อาจเป็นประโยชน์ในการปฏิบัติตามเมื่อจัดโครงสร้างโค้ด TFF เมื่อไม่ได้จัดการกับการดำเนินงานที่ง่ายมากรหัสจะกลายเป็นที่ชัดเจนมากขึ้นเมื่อ tff.tf_computation s ที่จะนำมาใช้กับการสร้างบล็อกภายใน tff.federated_computation จะถูกสร้างขึ้นในสถานที่แยกต่างหาก ภายในของ tff.federated_computation , ก่อสร้างตึกเหล่านี้มีการเชื่อมต่อใช้เฉพาะผู้ประกอบการที่แท้จริง

ในการตรวจสอบว่ามันทำงานตามที่คาดไว้:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

การรวมตัวภายใน

ขั้นตอนสุดท้ายคือการเปิดใช้งานทางเลือกในการมอบหมายการรวมจริงไปยังโรงงานอื่นๆ เพื่อให้ง่ายต่อการจัดองค์ประกอบของเทคนิคการรวมกลุ่มที่แตกต่างกัน

นี่คือความสำเร็จโดยการสร้างตัวเลือก inner_factory โต้แย้งในตัวสร้างของเรา ExampleTaskFactory ถ้าไม่ได้ระบุ tff.aggregators.SumFactory ถูกนำมาใช้ซึ่งนำไปใช้ tff.federated_sum ผู้ประกอบการมาใช้โดยตรงในส่วนก่อนหน้า

เมื่อ create เรียกว่าเราสามารถโทรครั้งแรก create ของ inner_factory เพื่อสร้างกระบวนการการรวมภายในด้วยเหมือนกัน value_type

สถานะของกระบวนการของเราส่งกลับโดย initialize_fn เป็นองค์ประกอบสองส่วนรัฐที่สร้างขึ้นโดย "นี้" กระบวนการและสถานะของกระบวนการภายในที่สร้างขึ้นเพียง

การดำเนินงานของ next_fn แตกต่างกันในการที่รวมตัวเกิดขึ้นจริงจะมอบหมายให้ next การทำงานของกระบวนการภายในและในวิธีการที่ผลลัพธ์สุดท้ายประกอบด้วย รัฐประกอบด้วยอีกครั้งของ "นี้" และรัฐ "ภายใน" และวัดที่มีองค์ประกอบในลักษณะที่คล้ายกันเป็น OrderedDict

ต่อไปนี้คือการดำเนินการตามรูปแบบดังกล่าว

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

เมื่อมอบหมายไป inner_process.next ฟังก์ชั่นโครงสร้างผลตอบแทนที่เราได้รับคือ tff.templates.MeasuredProcessOutput เดียวกับที่ทุ่งสาม - state , result และ measurements เมื่อมีการสร้างโครงสร้างผลตอบแทนโดยรวมของกระบวนการการรวมประกอบด้วยที่ state และ measurements สาขาที่ควรจะองค์ประกอบโดยทั่วไปและกลับมาอยู่ด้วยกัน ในทางตรงกันข้าม result สอดคล้องกับข้อมูลค่าที่ถูกรวบรวมและแทนที่จะ "ไหลผ่าน" รวมประกอบด้วย

state วัตถุควรจะเห็นเป็นรายละเอียดการดำเนินงานของโรงงานและทำให้องค์ประกอบอาจจะมีโครงสร้างใด ๆ แต่ measurements สอดคล้องกับค่านิยมที่จะมีการรายงานให้กับผู้ใช้ในบางจุด ดังนั้นเราจึงแนะนำให้ใช้ OrderedDict กับประกอบด้วยการตั้งชื่อดังกล่าวว่าจะมีความชัดเจนในองค์ประกอบที่ต่อไม่รายงานตัวชี้วัดมาจาก

ยังทราบการใช้งานของ tff.federated_zip ผู้ประกอบการ state วัตถุ contolled โดยกระบวนการสร้างควรจะเป็น tff.FederatedType ถ้าเราได้กลับมาแทน (this_state, inner_state) ใน initialize_fn ลายเซ็นประเภทผลตอบแทนจะเป็น tff.StructType มี 2 tuple ของ tff.FederatedType s การใช้ tff.federated_zip "ลิฟท์ที่" tff.FederatedType ถึงระดับด้านบน นี้จะใช้ในทำนองเดียวกันใน next_fn เมื่อเตรียมรัฐและการวัดจะถูกส่งกลับ

สุดท้ายนี้ เราสามารถดูว่าสิ่งนี้สามารถนำไปใช้กับการรวมภายในที่เป็นค่าเริ่มต้นได้อย่างไร:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... และด้วยการรวมภายในที่แตกต่างกัน ตัวอย่างเช่น ExampleTaskFactory :

client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

สรุป

ในบทช่วยสอนนี้ เราได้อธิบายแนวทางปฏิบัติที่ดีที่สุดที่ควรปฏิบัติตามเพื่อสร้างบล็อคส่วนประกอบการรวมวัตถุประสงค์ทั่วไป ซึ่งแสดงเป็นโรงงานรวม ลักษณะทั่วไปมาจากความตั้งใจในการออกแบบในสองวิธี:

  1. การคำนวณแบบกำหนดพารามิเตอร์ การรวมเป็นกลุ่มอาคารอิสระที่สามารถเสียบเข้ากับโมดูลอื่น ๆ ฉิบหายออกแบบมาเพื่อทำงานร่วมกับ tff.aggregators เพื่อ parameterize รวมที่จำเป็นของพวกเขาเช่น tff.learning.build_federated_averaging_process
  2. องค์ประกอบการรวม เอกสารสำเร็จรูปการรวมสามารถประกอบกับบล็อคส่วนประกอบการรวมอื่นๆ เพื่อสร้างการรวมแบบผสมที่ซับซ้อนมากขึ้น