ดูบน TensorFlow.org | ทำงานใน Google Colab | ดูแหล่งที่มาบน GitHub | ดาวน์โหลดโน๊ตบุ๊ค |
ในการกวดวิชานี้เราจะอธิบายหลักการออกแบบที่อยู่เบื้องหลัง tff.aggregators
โมดูลและการปฏิบัติที่ดีที่สุดสำหรับการดำเนินการรวมที่กำหนดเองของค่าจากลูกค้าไปยังเซิร์ฟเวอร์
ข้อกำหนดเบื้องต้น กวดวิชานี้จะถือว่าคุณมีอยู่แล้วคุ้นเคยกับแนวคิดพื้นฐานของ สหพันธ์หลัก เช่นตำแหน่ง ( tff.SERVER
, tff.CLIENTS
) วิธีฉิบหายแสดงให้เห็นถึงการคำนวณ ( tff.tf_computation
, tff.federated_computation
) และลายเซ็นของพวกเขาชนิด
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
สรุปการออกแบบ
ในฉิบหาย "รวม" หมายถึงการเคลื่อนไหวของชุดของค่าในที่ tff.CLIENTS
ในการผลิตมูลค่ารวมของชนิดเดียวกันใน tff.SERVER
นั่นคือ ค่าลูกค้าแต่ละรายไม่จำเป็นต้องมี ตัวอย่างเช่น ในการเรียนรู้แบบสหพันธรัฐ การอัปเดตโมเดลไคลเอ็นต์จะถูกเฉลี่ยเพื่อรับการอัปเดตโมเดลรวมเพื่อนำไปใช้กับโมเดลส่วนกลางบนเซิร์ฟเวอร์
นอกจากนี้ผู้ประกอบการที่จะบรรลุเป้าหมายนี้เช่น tff.federated_sum
, ฉิบหายให้ tff.templates.AggregationProcess
(เป็น กระบวนการ stateful ) ซึ่ง formalizes ลายเซ็นชนิดสำหรับการคำนวณรวมเพื่อที่จะสามารถพูดคุยกับรูปแบบที่ซับซ้อนมากขึ้นกว่าผลรวมที่เรียบง่าย
ส่วนประกอบหลักของ tff.aggregators
โมดูลโรงงานสำหรับการสร้าง AggregationProcess
ซึ่งถูกออกแบบมาให้โดยทั่วไปที่มีประโยชน์และ replacable สร้างบล็อคของฉิบหายในสองด้าน:
- การคำนวณแบบกำหนดพารามิเตอร์ การรวมเป็นกลุ่มอาคารอิสระที่สามารถเสียบเข้ากับโมดูลอื่น ๆ ฉิบหายออกแบบมาเพื่อทำงานร่วมกับ
tff.aggregators
เพื่อ parameterize รวมที่จำเป็นของพวกเขา
ตัวอย่าง:
learning_process = tff.learning.build_federated_averaging_process(
...,
model_update_aggregation_factory=tff.aggregators.MeanFactory())
- องค์ประกอบการรวม เอกสารสำเร็จรูปการรวมสามารถประกอบกับบล็อคส่วนประกอบการรวมอื่นๆ เพื่อสร้างการรวมแบบผสมที่ซับซ้อนมากขึ้น
ตัวอย่าง:
secure_mean = tff.aggregators.MeanFactory(
value_sum_factory=tff.aggregators.SecureSumFactory(...))
ส่วนที่เหลือของบทช่วยสอนนี้จะอธิบายว่าทั้งสองบรรลุเป้าหมายได้อย่างไร
กระบวนการรวบรวม
ครั้งแรกที่เราสรุป tff.templates.AggregationProcess
และตามด้วยรูปแบบโรงงานสำหรับการสร้าง
tff.templates.AggregationProcess
เป็น tff.templates.MeasuredProcess
กับประเภทลายเซ็นที่ระบุไว้สำหรับการรวมตัว โดยเฉพาะอย่างยิ่ง initialize
และ next
ฟังก์ชั่นที่มีลายเซ็นประเภทต่อไปนี้:
-
( -> state_type@SERVER)
-
(<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)
รัฐ (ประเภท state_type
) ต้องถูกวางไว้ที่เซิร์ฟเวอร์ next
ฟังก์ชั่นใช้เวลาเป็นอาร์กิวเมนต์การป้อนข้อมูลของรัฐและค่าที่จะถูกรวม (ประเภท value_type
) อยู่ที่ลูกค้า *
หมายถึงตัวเลือกข้อโต้แย้งป้อนข้อมูลอื่น ๆ สำหรับน้ำหนักเช่นในค่าเฉลี่ยถ่วงน้ำหนัก ส่งคืนอ็อบเจ็กต์สถานะที่อัปเดต ค่ารวมของประเภทเดียวกันที่วางไว้ที่เซิร์ฟเวอร์ และการวัดบางส่วน
โปรดทราบว่าทั้งของรัฐที่จะส่งผ่านระหว่างการประหารชีวิตของ next
ฟังก์ชั่นและการวัดการรายงานวัตถุประสงค์เพื่อรายงานข้อมูลใด ๆ ขึ้นอยู่กับการดำเนินการของการ next
ฟังก์ชั่นอาจจะว่างเปล่า อย่างไรก็ตาม ต้องระบุให้ชัดเจนเพื่อให้ส่วนอื่นๆ ของ TFF มีสัญญาที่ชัดเจนให้ปฏิบัติตาม
โมดูลฉิบหายอื่น ๆ เช่นการปรับปรุงรูปแบบใน tff.learning
ที่คาดว่าจะใช้ tff.templates.AggregationProcess
เพื่อ parameterize วิธีค่ามีการรวบรวม อย่างไรก็ตาม ค่าที่รวมกันคืออะไรและประเภทของลายเซ็นคืออะไร ขึ้นอยู่กับรายละเอียดอื่นๆ ของโมเดลที่กำลังฝึกและอัลกอริทึมการเรียนรู้ที่ใช้ทำ
เพื่อให้เป็นอิสระจากด้านอื่น ๆ ของการคำนวณรวมเราจะใช้รูปแบบโรงงาน - เราสร้างที่เหมาะสม tff.templates.AggregationProcess
ครั้งลายเซ็นประเภทที่เกี่ยวข้องของวัตถุที่จะรวมที่มีอยู่โดยอัญเชิญ create
วิธีการของโรงงาน การจัดการโดยตรงของกระบวนการรวมจึงจำเป็นสำหรับผู้เขียนห้องสมุดเท่านั้นซึ่งมีหน้าที่รับผิดชอบในการสร้างนี้
โรงงานแปรรูป
มีคลาสแฟกทอรีพื้นฐานที่เป็นนามธรรมสองคลาสสำหรับการรวมแบบไม่ถ่วงน้ำหนักและการรวมแบบถ่วงน้ำหนัก พวกเขา create
วิธีการใช้เวลาประเภทลายเซ็นของมูลค่าที่จะรวบรวมและส่งกลับ tff.templates.AggregationProcess
สำหรับการรวมตัวของค่าดังกล่าว
กระบวนการที่สร้างขึ้นโดย tff.aggregators.UnweightedAggregationFactory
ใช้อาร์กิวเมนต์ใส่สอง (1) รัฐที่เซิร์ฟเวอร์และ (2) ค่าของชนิดที่ระบุ value_type
การใช้ตัวอย่างเป็น tff.aggregators.SumFactory
กระบวนการที่สร้างขึ้นโดย tff.aggregators.WeightedAggregationFactory
ใช้เวลาสามข้อโต้แย้งการป้อนข้อมูล: (1) รัฐที่เซิร์ฟเวอร์ (2) ค่าของชนิดที่ระบุ value_type
และ (3) น้ำหนักของประเภท weight_type
ตามที่ระบุโดยผู้ใช้ของโรงงานเมื่อกล่าวอ้างของมัน create
วิธีการ
การใช้ตัวอย่างเป็น tff.aggregators.MeanFactory
ซึ่งคำนวณค่าเฉลี่ยถ่วงน้ำหนัก
รูปแบบโรงงานเป็นวิธีที่เราบรรลุเป้าหมายแรกที่ระบุไว้ข้างต้น การรวมกลุ่มนั้นเป็นหน่วยการสร้างที่เป็นอิสระ ตัวอย่างเช่น เมื่อเปลี่ยนตัวแปรรุ่นที่สามารถฝึกได้ การรวมที่ซับซ้อนไม่จำเป็นต้องเปลี่ยน โรงงานที่เป็นตัวแทนของมันจะถูกเรียกด้วยลายเซ็นของประเภทที่แตกต่างกันเมื่อนำมาใช้โดยวิธีการเช่น tff.learning.build_federated_averaging_process
องค์ประกอบ
โปรดจำไว้ว่ากระบวนการรวมทั่วไปสามารถสรุป (a) การประมวลผลล่วงหน้าของค่าที่ไคลเอนต์ (b) การเคลื่อนย้ายค่าจากไคลเอนต์ไปยังเซิร์ฟเวอร์ และ (c) การประมวลผลภายหลังของค่ารวมบางส่วนที่เซิร์ฟเวอร์ เป้าหมายที่สองระบุไว้ข้างต้นองค์ประกอบการรวมเป็นที่ตระหนักภายใน tff.aggregators
โมดูลโดยการจัดโครงสร้างการดำเนินงานของโรงงานรวมดังกล่าวว่าเป็นส่วนหนึ่ง (ข) สามารถมอบหมายให้โรงงานรวมอีก
แทนที่จะใช้ตรรกะที่จำเป็นทั้งหมดภายในคลาสโรงงานเดียว โดยค่าเริ่มต้นการนำไปใช้จะเน้นที่ด้านเดียวที่เกี่ยวข้องกับการรวม เมื่อจำเป็น รูปแบบนี้จะทำให้เราสามารถแทนที่บล็อคการสร้างทีละครั้ง
ตัวอย่างคือถ่วงน้ำหนัก tff.aggregators.MeanFactory
การใช้งานจะคูณค่าและน้ำหนักที่ให้ไว้กับไคลเอ็นต์ จากนั้นจึงรวมทั้งค่าถ่วงน้ำหนักและน้ำหนักแยกกัน จากนั้นจึงแบ่งผลรวมของค่าที่ถ่วงน้ำหนักด้วยผลรวมของน้ำหนักที่เซิร์ฟเวอร์ แต่ในการดำเนินการ summations โดยตรงโดยใช้ tff.federated_sum
ผู้ประกอบการรวมที่มีการมอบหมายให้ทั้งสองกรณีของ tff.aggregators.SumFactory
โครงสร้างดังกล่าวทำให้สามารถแทนที่ผลรวมเริ่มต้นสองค่าโดยโรงงานต่างๆ กัน ซึ่งตระหนักถึงผลรวมที่แตกต่างกัน ยกตัวอย่างเช่น tff.aggregators.SecureSumFactory
หรือดำเนินการเองของ tff.aggregators.UnweightedAggregationFactory
ตรงกันข้ามเวลา tff.aggregators.MeanFactory
สามารถตัวเองจะมีการรวมภายในของโรงงานอีกเช่น tff.aggregators.clipping_factory
ถ้าค่าที่จะได้รับการตัดก่อนเฉลี่ย
ดูที่ก่อนหน้านี้ ปรับแต่งรวมตัวแนะนำสำหรับการเรียนรู้ การสอนสำหรับการใช้งานของกลไก receommended องค์ประกอบโดยใช้โรงงานที่มีอยู่ใน tff.aggregators
โมดูล
แนวปฏิบัติที่ดีที่สุดตามตัวอย่าง
เราจะแสดงให้เห็นถึง tff.aggregators
แนวคิดในรายละเอียดโดยการใช้งานตัวอย่างง่ายๆและทำให้มันมีความก้าวหน้ามากขึ้นทั่วไป อีกวิธีในการเรียนรู้คือการดูการดำเนินงานของโรงงานที่มีอยู่
import collections
import tensorflow as tf
import tensorflow_federated as tff
แทนที่จะข้อสรุป value
งานตัวอย่างคือการสรุป value * 2.0
แล้วหารผลรวมจาก 2.0
ผลการรวมจึงเป็นทางคณิตศาสตร์เทียบเท่ากับข้อสรุปโดยตรง value
และสามารถจะคิดว่าเป็นประกอบด้วยสามส่วนคือ (1) การปรับขนาดที่ลูกค้า (2) ข้อสรุปทั่วลูกค้า (3) unscaling ที่เซิร์ฟเวอร์
ต่อไปนี้การออกแบบที่อธิบายข้างต้นตรรกะจะถูกนำมาใช้เป็น subclass ของ tff.aggregators.UnweightedAggregationFactory
ซึ่งจะสร้างที่เหมาะสม tff.templates.AggregationProcess
เมื่อได้รับ value_type
จะรวม:
การใช้งานน้อยที่สุด
สำหรับงานตัวอย่าง การคำนวณที่จำเป็นจะเหมือนกันเสมอ ดังนั้นจึงไม่จำเป็นต้องใช้สถานะ มันจึงว่างเปล่าและแสดงเป็น tff.federated_value((), tff.SERVER)
เช่นเดียวกับการวัดในขณะนี้
การใช้งานขั้นต่ำของงานจึงเป็นดังนี้:
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value((), tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
scaled_value = tff.federated_map(
tff.tf_computation(lambda x: x * 2.0), value)
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x: x / 2.0), summed_value)
measurements = tff.federated_value((), tff.SERVER)
return tff.templates.MeasuredProcessOutput(
state=state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
ไม่ว่าทุกอย่างจะทำงานตามที่คาดไว้หรือไม่ สามารถตรวจสอบได้ด้วยรหัสต่อไปนี้:
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result} (expected 8.0)')
Type signatures of the created aggregation process: - initialize: ( -> <>@SERVER) - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>) Aggregation result: 8.0 (expected 8.0)
สภาพและการวัด
สภาพถูกใช้อย่างกว้างๆ ใน TFF เพื่อแสดงถึงการคำนวณที่คาดว่าจะดำเนินการซ้ำๆ และเปลี่ยนแปลงไปกับการวนซ้ำแต่ละครั้ง ตัวอย่างเช่น สถานะของการคำนวณการเรียนรู้มีน้ำหนักของแบบจำลองที่กำลังเรียนรู้
เพื่อแสดงวิธีใช้สถานะในการคำนวณการรวม เราแก้ไขงานตัวอย่าง แทนการคูณ value
โดย 2.0
เราคูณด้วยดัชนีซ้ำ - จำนวนครั้งที่สรุปรวมได้รับการดำเนินการ
ในการทำเช่นนั้น เราจำเป็นต้องมีวิธีในการติดตามดัชนีการวนซ้ำ ซึ่งทำได้โดยผ่านแนวคิดเรื่องสถานะ ใน initialize_fn
แทนการสร้างสถานะว่างเปล่าเราเริ่มต้นของรัฐที่จะเป็นศูนย์เกลา จากนั้นรัฐสามารถนำมาใช้ใน next_fn
ในสามขั้นตอนคือ (1) เพิ่มขึ้นจาก 1.0
(2) ที่ใช้ในการคูณ value
และ (3) การกลับมาเป็นรัฐที่มีการปรับปรุงใหม่
ครั้งนี้จะทำคุณอาจทราบ: แต่ตรงรหัสเดียวกันกับข้างต้นสามารถนำมาใช้ในการตรวจสอบผลงานทั้งหมดตามที่คาดไว้ ฉันจะรู้ได้อย่างไรว่ามีบางอย่างเปลี่ยนแปลงไปจริงๆ
คำถามที่ดี! นี่คือจุดที่แนวคิดของการวัดมีประโยชน์ โดยทั่วไปการวัดสามารถรายงานค่าใด ๆ ที่เกี่ยวข้องกับการดำเนินการเดียวของ next
ฟังก์ชั่นซึ่งสามารถนำมาใช้สำหรับการตรวจสอบ ในกรณีนี้จะสามารถ summed_value
จากตัวอย่างก่อนหน้านี้ นั่นคือค่าก่อนขั้นตอน "unscaling" ซึ่งควรขึ้นอยู่กับดัชนีการวนซ้ำ อีกครั้ง ไม่จำเป็นว่ามีประโยชน์ในทางปฏิบัติ แต่แสดงให้เห็นกลไกที่เกี่ยวข้อง
คำตอบของ stateful สำหรับงานจึงมีลักษณะดังนี้:
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(
tff.tf_computation(lambda x: x + 1.0), state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
โปรดทราบว่า state
ที่เข้ามาใน next_fn
เป็น input ถูกวางไว้ที่เซิร์ฟเวอร์ เพื่อที่จะใช้มันที่ลูกค้ามันเป็นครั้งแรกที่จะต้องมีการสื่อสารซึ่งประสบความสำเร็จโดยใช้ tff.federated_broadcast
ผู้ประกอบการ
เพื่อตรวจสอบผลงานทั้งหมดเป็นไปตามคาดขณะนี้เราสามารถดูรายงาน measurements
ซึ่งควรจะแตกต่างกันด้วยในแต่ละรอบของการดำเนินการแม้ว่าการทำงานแบบเดียวกับ client_data
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 1)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 2)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 3)')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>) | Round #1 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 8.0 (expected 8.0 * 1) | Round #2 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 16.0 (expected 8.0 * 2) | Round #3 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 24.0 (expected 8.0 * 3)
โครงสร้างประเภท
น้ำหนักแบบจำลองของแบบจำลองที่ได้รับการฝึกฝนในการเรียนรู้แบบสหพันธรัฐมักจะแสดงเป็นชุดของเมตริกซ์ แทนที่จะเป็นเมตริกซ์เดียว ในฉิบหายนี้จะแสดงเป็น tff.StructType
และโรงงานรวมที่มีประโยชน์โดยทั่วไปจะต้องมีความสามารถที่จะยอมรับประเภทโครงสร้าง
อย่างไรก็ตามในตัวอย่างข้างต้นเราจะทำงานร่วมกับ tff.TensorType
วัตถุ ถ้าเราพยายามที่จะใช้โรงงานก่อนหน้านี้ในการสร้างกระบวนการการรวมตัวกับ tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))])
เราได้รับข้อผิดพลาดแปลกเพราะ TensorFlow จะพยายามที่จะคูณ tf.Tensor
และ list
ปัญหาคือว่าแทนที่จะคูณโครงสร้างของเทนเซอร์โดยคงที่เราต้องคูณแต่ละเมตริกซ์ในโครงสร้างโดยคงที่ วิธีการแก้ปัญหาตามปกติเพื่อแก้ไขปัญหานี้คือการใช้ tf.nest
ภายในโมดูลที่สร้าง tff.tf_computation
s
รุ่นก่อนหน้า ExampleTaskFactory
เข้ากันได้กับประเภทโครงสร้างจึงมีลักษณะดังต่อไปนี้:
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(add_one, state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(scale, (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
ตัวอย่างนี้เน้นรูปแบบที่อาจเป็นประโยชน์ในการปฏิบัติตามเมื่อจัดโครงสร้างโค้ด TFF เมื่อไม่ได้จัดการกับการดำเนินงานที่ง่ายมากรหัสจะกลายเป็นที่ชัดเจนมากขึ้นเมื่อ tff.tf_computation
s ที่จะนำมาใช้กับการสร้างบล็อกภายใน tff.federated_computation
จะถูกสร้างขึ้นในสถานที่แยกต่างหาก ภายในของ tff.federated_computation
, ก่อสร้างตึกเหล่านี้มีการเชื่อมต่อใช้เฉพาะผู้ประกอบการที่แท้จริง
ในการตรวจสอบว่ามันทำงานตามที่คาดไว้:
client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
[[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
f' Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>) Aggregation result: [[2. 3.], [6. 4. 0.]] Expected: [[2. 3.], [6. 4. 0.]]
การรวมตัวภายใน
ขั้นตอนสุดท้ายคือการเปิดใช้งานทางเลือกในการมอบหมายการรวมจริงไปยังโรงงานอื่นๆ เพื่อให้ง่ายต่อการจัดองค์ประกอบของเทคนิคการรวมกลุ่มที่แตกต่างกัน
นี่คือความสำเร็จโดยการสร้างตัวเลือก inner_factory
โต้แย้งในตัวสร้างของเรา ExampleTaskFactory
ถ้าไม่ได้ระบุ tff.aggregators.SumFactory
ถูกนำมาใช้ซึ่งนำไปใช้ tff.federated_sum
ผู้ประกอบการมาใช้โดยตรงในส่วนก่อนหน้า
เมื่อ create
เรียกว่าเราสามารถโทรครั้งแรก create
ของ inner_factory
เพื่อสร้างกระบวนการการรวมภายในด้วยเหมือนกัน value_type
สถานะของกระบวนการของเราส่งกลับโดย initialize_fn
เป็นองค์ประกอบสองส่วนรัฐที่สร้างขึ้นโดย "นี้" กระบวนการและสถานะของกระบวนการภายในที่สร้างขึ้นเพียง
การดำเนินงานของ next_fn
แตกต่างกันในการที่รวมตัวเกิดขึ้นจริงจะมอบหมายให้ next
การทำงานของกระบวนการภายในและในวิธีการที่ผลลัพธ์สุดท้ายประกอบด้วย รัฐประกอบด้วยอีกครั้งของ "นี้" และรัฐ "ภายใน" และวัดที่มีองค์ประกอบในลักษณะที่คล้ายกันเป็น OrderedDict
ต่อไปนี้คือการดำเนินการตามรูปแบบดังกล่าว
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def __init__(self, inner_factory=None):
if inner_factory is None:
inner_factory = tff.aggregators.SumFactory()
self._inner_factory = inner_factory
def create(self, value_type):
inner_process = self._inner_factory.create(value_type)
@tff.federated_computation()
def initialize_fn():
my_state = tff.federated_value(0.0, tff.SERVER)
inner_state = inner_process.initialize()
return tff.federated_zip((my_state, inner_state))
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
my_state, inner_state = state
my_new_state = tff.federated_map(add_one, my_state)
my_state_at_clients = tff.federated_broadcast(my_new_state)
scaled_value = tff.federated_map(scale, (value, my_state_at_clients))
# Delegation to an inner factory, returning values placed at SERVER.
inner_output = inner_process.next(inner_state, scaled_value)
unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))
new_state = tff.federated_zip((my_new_state, inner_output.state))
measurements = tff.federated_zip(
collections.OrderedDict(
scaled_value=inner_output.result,
example_task=inner_output.measurements))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
เมื่อมอบหมายไป inner_process.next
ฟังก์ชั่นโครงสร้างผลตอบแทนที่เราได้รับคือ tff.templates.MeasuredProcessOutput
เดียวกับที่ทุ่งสาม - state
, result
และ measurements
เมื่อมีการสร้างโครงสร้างผลตอบแทนโดยรวมของกระบวนการการรวมประกอบด้วยที่ state
และ measurements
สาขาที่ควรจะองค์ประกอบโดยทั่วไปและกลับมาอยู่ด้วยกัน ในทางตรงกันข้าม result
สอดคล้องกับข้อมูลค่าที่ถูกรวบรวมและแทนที่จะ "ไหลผ่าน" รวมประกอบด้วย
state
วัตถุควรจะเห็นเป็นรายละเอียดการดำเนินงานของโรงงานและทำให้องค์ประกอบอาจจะมีโครงสร้างใด ๆ แต่ measurements
สอดคล้องกับค่านิยมที่จะมีการรายงานให้กับผู้ใช้ในบางจุด ดังนั้นเราจึงแนะนำให้ใช้ OrderedDict
กับประกอบด้วยการตั้งชื่อดังกล่าวว่าจะมีความชัดเจนในองค์ประกอบที่ต่อไม่รายงานตัวชี้วัดมาจาก
ยังทราบการใช้งานของ tff.federated_zip
ผู้ประกอบการ state
วัตถุ contolled โดยกระบวนการสร้างควรจะเป็น tff.FederatedType
ถ้าเราได้กลับมาแทน (this_state, inner_state)
ใน initialize_fn
ลายเซ็นประเภทผลตอบแทนจะเป็น tff.StructType
มี 2 tuple ของ tff.FederatedType
s การใช้ tff.federated_zip
"ลิฟท์ที่" tff.FederatedType
ถึงระดับด้านบน นี้จะใช้ในทำนองเดียวกันใน next_fn
เมื่อเตรียมรัฐและการวัดจะถูกส่งกลับ
สุดท้ายนี้ เราสามารถดูว่าสิ่งนี้สามารถนำไปใช้กับการรวมภายในที่เป็นค่าเริ่มต้นได้อย่างไร:
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: () | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: ()
... และด้วยการรวมภายในที่แตกต่างกัน ตัวอย่างเช่น ExampleTaskFactory
:
client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())]) | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])
สรุป
ในบทช่วยสอนนี้ เราได้อธิบายแนวทางปฏิบัติที่ดีที่สุดที่ควรปฏิบัติตามเพื่อสร้างบล็อคส่วนประกอบการรวมวัตถุประสงค์ทั่วไป ซึ่งแสดงเป็นโรงงานรวม ลักษณะทั่วไปมาจากความตั้งใจในการออกแบบในสองวิธี:
- การคำนวณแบบกำหนดพารามิเตอร์ การรวมเป็นกลุ่มอาคารอิสระที่สามารถเสียบเข้ากับโมดูลอื่น ๆ ฉิบหายออกแบบมาเพื่อทำงานร่วมกับ
tff.aggregators
เพื่อ parameterize รวมที่จำเป็นของพวกเขาเช่นtff.learning.build_federated_averaging_process
- องค์ประกอบการรวม เอกสารสำเร็จรูปการรวมสามารถประกอบกับบล็อคส่วนประกอบการรวมอื่นๆ เพื่อสร้างการรวมแบบผสมที่ซับซ้อนมากขึ้น