Lihat di TensorFlow.org | Jalankan di Google Colab | Lihat sumber di GitHub | Unduh buku catatan |
Dalam tutorial ini, kami akan menjelaskan prinsip-prinsip desain belakang tff.aggregators
modul dan praktik terbaik untuk menerapkan agregasi kustom dari nilai-nilai dari klien ke server.
Prasyarat. Tutorial ini mengasumsikan Anda sudah akrab dengan konsep dasar Federasi Inti seperti penempatan ( tff.SERVER
, tff.CLIENTS
), bagaimana TFF merupakan perhitungan ( tff.tf_computation
, tff.federated_computation
) dan jenis tanda tangan mereka.
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
Ringkasan desain
Dalam TFF, "agregasi" mengacu pada pergerakan seperangkat nilai-nilai pada tff.CLIENTS
untuk menghasilkan nilai agregat dari jenis yang sama pada tff.SERVER
. Artinya, setiap nilai klien individu tidak perlu tersedia. Misalnya dalam pembelajaran gabungan, pembaruan model klien dirata-ratakan untuk mendapatkan pembaruan model agregat untuk diterapkan ke model global di server.
Selain operator mencapai tujuan ini seperti tff.federated_sum
, TFF menyediakan tff.templates.AggregationProcess
(a proses stateful ) yang meresmikan tanda tangan jenis untuk agregasi perhitungan sehingga dapat menggeneralisasi ke bentuk yang lebih kompleks daripada jumlah yang sederhana.
Komponen utama dari tff.aggregators
modul adalah pabrik-pabrik untuk penciptaan AggregationProcess
, yang dirancang untuk secara umum berguna dan replacable blok bangunan dari TFF dalam dua aspek:
- Perhitungan parameter. Agregasi merupakan blok bangunan independen yang dapat dipasang ke modul TFF lain yang dirancang untuk bekerja dengan
tff.aggregators
untuk parameterisasi agregasi diperlukan mereka.
Contoh:
learning_process = tff.learning.build_federated_averaging_process(
...,
model_update_aggregation_factory=tff.aggregators.MeanFactory())
- Komposisi agregasi. Sebuah blok bangunan agregasi dapat disusun dengan blok bangunan agregasi lainnya untuk membuat agregasi komposit yang lebih kompleks.
Contoh:
secure_mean = tff.aggregators.MeanFactory(
value_sum_factory=tff.aggregators.SecureSumFactory(...))
Sisa dari tutorial ini menjelaskan bagaimana kedua tujuan ini tercapai.
Proses agregasi
Kami pertama merangkum tff.templates.AggregationProcess
, dan ikuti dengan pola pabrik untuk penciptaan.
The tff.templates.AggregationProcess
adalah tff.templates.MeasuredProcess
dengan jenis tanda tangan yang ditentukan untuk agregasi. Secara khusus, initialize
dan next
fungsi memiliki tipe berikut tanda tangan:
-
( -> state_type@SERVER)
-
(<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)
Negara (tipe state_type
) harus ditempatkan di server yang. The next
fungsi mengambil sebagai argumen masukan negara dan nilai yang akan digabungkan (tipe value_type
) ditempatkan di klien. The *
berarti opsional argumen input lainnya, misalnya bobot dalam rata-rata tertimbang. Ini mengembalikan objek status yang diperbarui, nilai agregat dari jenis yang sama yang ditempatkan di server, dan beberapa pengukuran.
Perhatikan bahwa kedua negara akan melewati antara eksekusi dari next
fungsi, dan pengukuran dilaporkan dimaksudkan untuk melaporkan informasi tergantung pada pelaksanaan tertentu dari next
fungsi, mungkin kosong. Namun demikian, mereka harus secara eksplisit ditentukan agar bagian lain dari TFF memiliki kontrak yang jelas untuk diikuti.
Modul TFF lain, misalnya update model dalam tff.learning
, diharapkan untuk menggunakan tff.templates.AggregationProcess
untuk parameterisasi bagaimana nilai-nilai yang dikumpulkan. Namun, apa sebenarnya nilai yang dikumpulkan dan apa jenis tanda tangannya, tergantung pada detail lain dari model yang dilatih dan algoritme pembelajaran yang digunakan untuk melakukannya.
Untuk membuat agregasi independen dari aspek-aspek lain dari perhitungan, kita menggunakan pola pabrik - kami membuat sesuai tff.templates.AggregationProcess
sekali jenis tanda tangan yang relevan dari objek yang akan dikumpulkan yang tersedia, dengan menerapkan create
metode pabrik. Oleh karena itu, penanganan langsung dari proses agregasi hanya diperlukan untuk penulis perpustakaan, yang bertanggung jawab atas pembuatan ini.
Pabrik proses agregasi
Ada dua kelas pabrik dasar abstrak untuk agregasi tak tertimbang dan tertimbang. Mereka create
metode mengambil tanda tangan jenis nilai yang akan dikumpulkan dan mengembalikan tff.templates.AggregationProcess
untuk agregasi dari nilai-nilai tersebut.
Proses yang diciptakan oleh tff.aggregators.UnweightedAggregationFactory
membutuhkan dua argumen input: (1) negara pada server dan nilai tipe tertentu (2) value_type
.
Contoh implementasi adalah tff.aggregators.SumFactory
.
Proses yang diciptakan oleh tff.aggregators.WeightedAggregationFactory
mengambil tiga argumen input: (1) negara pada Server, nilai (2) jenis tertentu value_type
dan (3) berat jenis weight_type
, sebagaimana ditentukan oleh pengguna pabrik saat menjalankan nya create
metode.
Contoh implementasi adalah tff.aggregators.MeanFactory
yang menghitung rata-rata tertimbang.
Pola pabrik adalah bagaimana kita mencapai tujuan pertama yang disebutkan di atas; bahwa agregasi adalah blok bangunan independen. Misalnya, ketika mengubah variabel model mana yang dapat dilatih, agregasi kompleks tidak perlu diubah; pabrik yang mewakili itu akan dipanggil dengan jenis tanda tangan yang berbeda bila digunakan dengan metode seperti tff.learning.build_federated_averaging_process
.
Komposisi
Ingat bahwa proses agregasi umum dapat merangkum (a) beberapa prapemrosesan nilai di klien, (b) pergerakan nilai dari klien ke server, dan (c) beberapa pascapemrosesan nilai agregat di server. Gol kedua yang disebutkan di atas, komposisi agregasi, diwujudkan dalam tff.aggregators
modul dengan penataan pelaksanaan pabrik agregasi seperti bagian (b) dapat didelegasikan kepada pabrik agregasi lain.
Daripada menerapkan semua logika yang diperlukan dalam satu kelas pabrik, implementasi secara default difokuskan pada satu aspek yang relevan untuk agregasi. Saat dibutuhkan, pola ini kemudian memungkinkan kita untuk mengganti blok bangunan satu per satu.
Contohnya adalah tertimbang tff.aggregators.MeanFactory
. Implementasinya mengalikan nilai dan bobot yang diberikan pada klien, lalu menjumlahkan nilai bobot dan bobot secara independen, lalu membagi jumlah nilai bobot dengan jumlah bobot di server. Alih-alih menerapkan penjumlahan dengan langsung menggunakan tff.federated_sum
operator, penjumlahan tersebut didelegasikan kepada dua contoh tff.aggregators.SumFactory
.
Struktur seperti itu memungkinkan dua penjumlahan default diganti oleh pabrik yang berbeda, yang merealisasikan penjumlahan secara berbeda. Misalnya, tff.aggregators.SecureSumFactory
, atau pelaksanaan adat dari tff.aggregators.UnweightedAggregationFactory
. Sebaliknya, waktu, tff.aggregators.MeanFactory
bisa sendiri menjadi agregasi dalam dari pabrik lain seperti tff.aggregators.clipping_factory
, jika nilai-nilai yang harus dipotong sebelum averaging.
Lihat sebelumnya Tuning direkomendasikan agregasi untuk belajar tutorial untuk penggunaan receommended dari mekanisme komposisi menggunakan pabrik-pabrik yang ada di tff.aggregators
modul.
Praktik terbaik dengan contoh
Kita akan menggambarkan tff.aggregators
konsep secara detail dengan menerapkan contoh tugas sederhana, dan membuatnya semakin lebih umum. Cara lain untuk belajar adalah dengan melihat implementasi pabrik-pabrik yang ada.
import collections
import tensorflow as tf
import tensorflow_federated as tff
Alih-alih menjumlahkan value
, contoh tugas adalah untuk jumlah value * 2.0
dan kemudian membagi jumlah tersebut dengan 2.0
. Hasil agregasi demikian matematis setara dengan langsung menjumlahkan value
, dan bisa dianggap sebagai terdiri dari tiga bagian: (1) skala pada klien (2) menjumlahkan seluruh klien (3) unscaling di Server.
Berikut desain dijelaskan di atas, logika akan dilaksanakan sebagai subclass dari tff.aggregators.UnweightedAggregationFactory
, yang menciptakan sesuai tff.templates.AggregationProcess
ketika diberi value_type
ke agregat:
Implementasi minimal
Untuk tugas contoh, perhitungan yang diperlukan selalu sama, jadi tidak perlu menggunakan status. Demikian kosong, dan direpresentasikan sebagai tff.federated_value((), tff.SERVER)
. Hal yang sama berlaku untuk pengukuran, untuk saat ini.
Implementasi minimal dari tugas tersebut adalah sebagai berikut:
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value((), tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
scaled_value = tff.federated_map(
tff.tf_computation(lambda x: x * 2.0), value)
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x: x / 2.0), summed_value)
measurements = tff.federated_value((), tff.SERVER)
return tff.templates.MeasuredProcessOutput(
state=state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
Apakah semuanya berfungsi seperti yang diharapkan dapat diverifikasi dengan kode berikut:
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result} (expected 8.0)')
Type signatures of the created aggregation process: - initialize: ( -> <>@SERVER) - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>) Aggregation result: 8.0 (expected 8.0)
Statefulness dan pengukuran
Statefulness secara luas digunakan dalam TFF untuk mewakili perhitungan yang diharapkan dieksekusi secara iteratif dan berubah dengan setiap iterasi. Misalnya, keadaan komputasi pembelajaran berisi bobot model yang dipelajari.
Untuk mengilustrasikan cara menggunakan status dalam komputasi agregasi, kami memodifikasi tugas contoh. Alih-alih mengalikan value
dengan 2.0
, kita kalikan dengan indeks iterasi - jumlah kali agregasi telah dieksekusi.
Untuk melakukannya, kita memerlukan cara untuk melacak indeks iterasi, yang dicapai melalui konsep keadaan. Dalam initialize_fn
, bukan menciptakan keadaan kosong, kita menginisialisasi negara menjadi nol skalar. Kemudian, negara dapat digunakan dalam next_fn
dalam tiga langkah: (1) peningkatan oleh 1.0
, (2) gunakan untuk kalikan value
, dan (3) kembali sebagai negara diperbarui baru.
Setelah ini dilakukan, Anda dapat mencatat: Tapi kode yang sama persis seperti di atas dapat digunakan untuk memverifikasi semua bekerja seperti yang diharapkan. Bagaimana saya tahu sesuatu telah benar-benar berubah?
Pertanyaan bagus! Di sinilah konsep pengukuran menjadi berguna. Secara umum, pengukuran dapat melaporkan nilai yang relevan untuk eksekusi tunggal dari next
fungsi, yang dapat digunakan untuk memantau. Dalam hal ini, dapat menjadi summed_value
dari contoh sebelumnya. Artinya, nilai sebelum langkah "unscaling", yang harus bergantung pada indeks iterasi. Sekali lagi, ini tidak selalu berguna dalam praktik, tetapi menggambarkan mekanisme yang relevan.
Jawaban stateful untuk tugas tersebut terlihat sebagai berikut:
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(
tff.tf_computation(lambda x: x + 1.0), state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
Perhatikan bahwa state
yang datang ke next_fn
sebagai masukan ditempatkan di server yang. Dalam rangka untuk menggunakannya pada klien, pertama kali perlu dikomunikasikan, yang dicapai dengan menggunakan tff.federated_broadcast
operator.
Untuk memverifikasi semua bekerja seperti yang diharapkan, kita sekarang dapat melihat dilaporkan measurements
, yang harus berbeda dengan setiap putaran eksekusi, bahkan jika dijalankan dengan sama client_data
.
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 1)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 2)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 3)')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>) | Round #1 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 8.0 (expected 8.0 * 1) | Round #2 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 16.0 (expected 8.0 * 2) | Round #3 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 24.0 (expected 8.0 * 3)
Tipe terstruktur
Bobot model dari model yang dilatih dalam pembelajaran federasi biasanya direpresentasikan sebagai kumpulan tensor, bukan tensor tunggal. Dalam TFF, ini diwakili sebagai tff.StructType
dan pabrik agregasi biasanya berguna harus mampu menerima jenis terstruktur.
Namun, pada contoh di atas, kita hanya bekerja dengan tff.TensorType
objek. Jika kita mencoba untuk menggunakan pabrik sebelumnya untuk membuat proses agregasi dengan tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))])
, kita mendapatkan error aneh karena TensorFlow akan mencoba untuk memperbanyak suatu tf.Tensor
dan list
.
Masalahnya adalah bahwa alih-alih mengalikan struktur tensor dengan konstan, kita perlu kalikan masing-masing tensor dalam struktur dengan konstan. Solusi yang biasa untuk masalah ini adalah dengan menggunakan tf.nest
di dalam modul yang dibuat tff.tf_computation
s.
Versi sebelumnya ExampleTaskFactory
kompatibel dengan jenis terstruktur sehingga terlihat sebagai berikut:
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(add_one, state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(scale, (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
Contoh ini menyoroti pola yang mungkin berguna untuk diikuti saat menyusun kode TFF. Ketika tidak berurusan dengan operasi yang sangat sederhana, kode menjadi lebih mudah dibaca ketika tff.tf_computation
s yang akan digunakan sebagai blok bangunan di dalam tff.federated_computation
diciptakan di tempat terpisah. Bagian dalam tff.federated_computation
, blok bangunan ini hanya terhubung menggunakan operator intrinsik.
Untuk memverifikasi itu berfungsi seperti yang diharapkan:
client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
[[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
f' Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>) Aggregation result: [[2. 3.], [6. 4. 0.]] Expected: [[2. 3.], [6. 4. 0.]]
Agregasi dalam
Langkah terakhir adalah secara opsional memungkinkan pendelegasian agregasi aktual ke pabrik lain, untuk memungkinkan komposisi yang mudah dari teknik agregasi yang berbeda.
Hal ini dicapai dengan menciptakan opsional inner_factory
argumen dalam konstruktor dari kami ExampleTaskFactory
. Jika tidak ditentukan, tff.aggregators.SumFactory
digunakan, yang menerapkan tff.federated_sum
operator yang digunakan langsung pada bagian sebelumnya.
Ketika create
disebut, pertama kita dapat memanggil create
dari inner_factory
untuk membuat proses agregasi batin dengan sama value_type
.
Keadaan proses kami dikembalikan oleh initialize_fn
adalah komposisi dari dua bagian: negara yang diciptakan oleh "ini" proses, dan negara dari proses batin yang baru saja dibuat.
Pelaksanaan next_fn
berbeda dalam bahwa agregasi yang sebenarnya didelegasikan kepada next
fungsi dari proses batin, dan bagaimana hasil akhir terdiri. Negara ini lagi terdiri dari "ini" dan "batin" negara, dan pengukuran terdiri dengan cara yang sama sebagai OrderedDict
.
Berikut ini adalah implementasi dari pola tersebut.
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def __init__(self, inner_factory=None):
if inner_factory is None:
inner_factory = tff.aggregators.SumFactory()
self._inner_factory = inner_factory
def create(self, value_type):
inner_process = self._inner_factory.create(value_type)
@tff.federated_computation()
def initialize_fn():
my_state = tff.federated_value(0.0, tff.SERVER)
inner_state = inner_process.initialize()
return tff.federated_zip((my_state, inner_state))
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
my_state, inner_state = state
my_new_state = tff.federated_map(add_one, my_state)
my_state_at_clients = tff.federated_broadcast(my_new_state)
scaled_value = tff.federated_map(scale, (value, my_state_at_clients))
# Delegation to an inner factory, returning values placed at SERVER.
inner_output = inner_process.next(inner_state, scaled_value)
unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))
new_state = tff.federated_zip((my_new_state, inner_output.state))
measurements = tff.federated_zip(
collections.OrderedDict(
scaled_value=inner_output.result,
example_task=inner_output.measurements))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
Ketika mendelegasikan ke inner_process.next
fungsi, struktur kembali kami dapatkan adalah tff.templates.MeasuredProcessOutput
, dengan tiga bidang yang sama - state
, result
dan measurements
. Ketika membuat struktur keseluruhan laba dari proses agregasi terdiri, para state
dan measurements
bidang harus umumnya terdiri dan kembali bersama-sama. Sebaliknya, result
berkorespondensi lapangan untuk nilai yang dikumpulkan dan bukannya "mengalir melalui" agregasi terdiri.
The state
objek harus dilihat sebagai detil implementasi pabrik, dan dengan demikian komposisi bisa menjadi struktur apapun. Namun, measurements
sesuai dengan nilai-nilai yang akan dilaporkan kepada pengguna di beberapa titik. Oleh karena itu, kami sarankan untuk menggunakan OrderedDict
, dengan terdiri penamaan sehingga akan jelas mana dalam komposisi tidak dilaporkan metrik berasal dari.
Perhatikan juga penggunaan tff.federated_zip
operator. The state
objek contolled oleh proses menciptakan harus tff.FederatedType
. Jika kita telah bukannya kembali (this_state, inner_state)
di initialize_fn
, kembali jenis tanda tangan akan menjadi tff.StructType
yang berisi 2-tupel dari tff.FederatedType
s. Penggunaan tff.federated_zip
"lift" yang tff.FederatedType
ke tingkat atas. Ini sama digunakan dalam next_fn
ketika mempersiapkan negara dan pengukuran untuk dikembalikan.
Akhirnya, kita dapat melihat bagaimana ini dapat digunakan dengan agregasi internal default:
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: () | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: ()
... dan dengan agregasi dalam yang berbeda. Sebagai contoh, sebuah ExampleTaskFactory
:
client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())]) | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])
Ringkasan
Dalam tutorial ini, kami menjelaskan praktik terbaik yang harus diikuti untuk membuat blok penyusun agregasi tujuan umum, yang direpresentasikan sebagai pabrik agregasi. Keumuman datang melalui maksud desain dalam dua cara:
- Perhitungan parameter. Agregasi merupakan blok bangunan independen yang dapat dipasang ke modul TFF lain yang dirancang untuk bekerja dengan
tff.aggregators
untuk parameterisasi agregasi yang diperlukan, sepertitff.learning.build_federated_averaging_process
. - Komposisi agregasi. Sebuah blok bangunan agregasi dapat disusun dengan blok bangunan agregasi lainnya untuk membuat agregasi komposit yang lebih kompleks.