Wdrażanie agregacji niestandardowych

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

W tym poradniku wyjaśniamy założeń projektowych w tff.aggregators modułu i najlepszych praktyk w zakresie wdrażania niestandardowych agregacji wartości od klientów do serwera.

Warunki wstępne. Ten poradnik zakłada, że są już zaznajomieni z podstawowymi pojęciami Federalne Rdzenia takich jak docelowych ( tff.SERVER , tff.CLIENTS ), jak TFF reprezentuje obliczeń ( tff.tf_computation , tff.federated_computation ) i ich podpisami typu.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

Podsumowanie projektu

W TFF „agregacja” odnosi się do przepływu zbioru wartości na tff.CLIENTS produkować łącznej wartości tego samego typu tff.SERVER . Oznacza to, że każda indywidualna wartość klienta nie musi być dostępna. Na przykład w uczeniu sfederowanym aktualizacje modelu klienta są uśredniane, aby uzyskać aktualizację modelu zagregowanego do zastosowania do modelu globalnego na serwerze.

Oprócz podmiotów realizujących ten cel, takich jak tff.federated_sum , TFF zapewnia tff.templates.AggregationProcess (a Stateful procesowego ), która formalizuje podpis typu dla obliczeń agregacji może więc uogólnić do bardziej złożonych form niż prosta suma.

Głównymi składnikami tff.aggregators modułu są fabryki do tworzenia AggregationProcess , które są przeznaczone do ogólnie użyteczne i Wymienny budulcem TFF w dwóch aspektach:

  1. Obliczenia parametryczne. Agregacja jest niezależny blok budynek, który można podłączyć do innych modułów TFF zaprojektowany do pracy z tff.aggregators parameterize ich niezbędnego agregacji.

Przykład:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. Skład agregacyjny. Blok budulcowy agregacji można skomponować z innymi blokami budulcowymi agregacji w celu utworzenia bardziej złożonych agregacji złożonych.

Przykład:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

Pozostała część tego samouczka wyjaśnia, w jaki sposób osiąga się te dwa cele.

Proces agregacji

My pierwsze podsumowania tff.templates.AggregationProcess i postępuj zgodnie ze wzorem fabryki do jej stworzenia.

tff.templates.AggregationProcess jest tff.templates.MeasuredProcess z podpisami typu określonego dla agregacji. W szczególności, initialize i next funkcje mają podpisy następujące rodzaje:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

Stan (typu state_type ) musi być umieszczony na serwerze. next funkcja przyjmuje jako argument wejściowy stan i wartość należy agregować (typu value_type ) umieszczony na klientów. W * oznacza ewentualnie inne argumenty wejściowe, na przykład wag w średniej ważonej. Zwraca zaktualizowany obiekt stanu, zagregowaną wartość tego samego typu umieszczoną na serwerze oraz niektóre pomiary.

Należy pamiętać, że oba państwa mają być przekazywane między egzekucji na next funkcji, a zgłoszone pomiary przeznaczony do zgłaszania wszelkich informacji w zależności od konkretnej egzekucji w next funkcji, może być pusta. Niemniej jednak muszą one być wyraźnie określone, aby inne części TFF miały jasną umowę do przestrzegania.

Inne moduły TFF, na przykład aktualizacji modelu w tff.learning oczekuje się, aby skorzystać z tff.templates.AggregationProcess parameterize jak wartości są sumowane. Jednak to, czym dokładnie są agregowane wartości i jakie są ich sygnatury typów, zależy od innych szczegółów trenowanego modelu i zastosowanego do tego algorytmu uczenia.

Aby dokonać agregacji niezależny od innych aspektów obliczeń używamy wzorca Factory - tworzymy odpowiednią tff.templates.AggregationProcess raz stosowne podpisy typu obiektów mają być agregowane są dostępne, powołując się na create metody fabryki. Bezpośrednia obsługa procesu agregacji jest więc potrzebna tylko autorom biblioteki, którzy są odpowiedzialni za tę kreację.

Fabryki procesów agregacji

Istnieją dwie abstrakcyjne klasy fabryk podstawowych dla agregacji nieważonej i ważonej. Ich create metoda bierze podpisów typ wartości mają być agregowane i zwraca tff.templates.AggregationProcess agregacji takich wartości.

Proces utworzony przez tff.aggregators.UnweightedAggregationFactory ma dwa argumenty wejściowe: (1) stan w serwerze oraz (2) wartości określonego typu value_type .

Przykładem jest realizacja tff.aggregators.SumFactory .

Proces utworzony przez tff.aggregators.WeightedAggregationFactory trzy argumenty wejściowych (1) stan w serwerze, (2) wartość określonego typu value_type i (3) masy typu weight_type , określone przez użytkownika, w fabryce, gdy wywołanie jego create metody.

Przykładem jest realizacja tff.aggregators.MeanFactory który oblicza średnią ważoną.

Wzorzec fabryki to sposób, w jaki osiągamy pierwszy cel, o którym mowa powyżej; ta agregacja jest niezależnym elementem składowym. Na przykład podczas zmiany zmiennych modelu, które można wytrenować, złożona agregacja niekoniecznie musi być zmieniona; fabryka reprezentujący go zostanie wywołany z innego podpisu typu stosowany metodą taką jak tff.learning.build_federated_averaging_process .

Kompozycje

Przypomnijmy, że ogólny proces agregacji może obejmować (a) pewne wstępne przetwarzanie wartości u klientów, (b) przenoszenie wartości z klienta na serwer oraz (c) pewne przetwarzanie końcowe zagregowanej wartości na serwerze. Drugim celem wspomniano powyżej, kompozycja agregacji, jest realizowany Wewnątrz tff.aggregators modułem strukturyzacji wdrażania fabrykach agregacji takich, że część (b) mogą zostać przekazane innej fabryce agregacji.

Zamiast implementować całą niezbędną logikę w jednej klasie fabryki, implementacje są domyślnie skoncentrowane na jednym aspekcie dotyczącym agregacji. W razie potrzeby ten wzór umożliwia nam wymianę bloków konstrukcyjnych pojedynczo.

Przykładem jest ważona tff.aggregators.MeanFactory . Jego implementacja mnoży podane wartości i wagi u klientów, następnie sumuje niezależnie zarówno wartości ważone, jak i wagi, a następnie dzieli sumę wartości ważonych przez sumę wag na serwerze. Zamiast realizacji sumowanie bezpośrednio za pomocą tff.federated_sum operatora, podsumowanie jest przekazane do dwóch przypadkach tff.aggregators.SumFactory .

Taka konstrukcja umożliwia zastąpienie dwóch domyślnych sum przez różne fabryki, które w różny sposób realizują sumę. Na przykład, tff.aggregators.SecureSumFactory lub realizacja zwyczaj tff.aggregators.UnweightedAggregationFactory . Odwrotnie, czas, tff.aggregators.MeanFactory sama może być wewnętrzny agregacja innej fabryce, takich jak tff.aggregators.clipping_factory , jeśli wartości mają być obcięte przed uśredniania.

Zobacz poprzedni Tuning zaleca agregacji do nauki poradnik dla receommended wykorzystania mechanizmu składu przy użyciu istniejących fabryk w tff.aggregators modułu.

Najlepsze praktyki na przykładzie

Jedziemy do zilustrowania tff.aggregators pojęć w szczegółach poprzez wdrożenie prosty przykład zadania, i sprawiają, że coraz bardziej ogólne. Innym sposobem nauki jest spojrzenie na wdrażanie istniejących fabryk.

import collections
import tensorflow as tf
import tensorflow_federated as tff

Zamiast zsumowanie value , przykład zadaniem jest suma value * 2.0 , a następnie podzielić przez sumę 2.0 . Wynik agregacji zatem matematycznie równoważne bezpośrednio zsumowanie value i mogą być traktowane jako składające się z trzech części (1), skalowanie do klientów (2) podsumowujący całej klientów (3) unscaling na serwerze.

Według projektu wyjaśniono powyżej, logika będą realizowane jako podklasa tff.aggregators.UnweightedAggregationFactory , która tworzy odpowiednią tff.templates.AggregationProcess gdy dany jest value_type aby łączna:

Minimalna implementacja

Dla przykładowego zadania niezbędne obliczenia są zawsze takie same, więc nie ma potrzeby używania stanu. Jest więc pusty i reprezentowane tff.federated_value((), tff.SERVER) . Na razie to samo dotyczy pomiarów.

Minimalna realizacja zadania jest więc następująca:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

To, czy wszystko działa zgodnie z oczekiwaniami, można zweryfikować za pomocą następującego kodu:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

Stanowość i pomiary

Statefulness jest szeroko stosowany w TFF do reprezentowania obliczeń, które mają być wykonywane iteracyjnie i zmieniają się z każdą iteracją. Na przykład stan obliczania uczenia zawiera wagi uczonego modelu.

Aby zilustrować, jak używać stanu w obliczeniach agregacji, modyfikujemy przykładowe zadanie. Zamiast pomnożenie value przez 2.0 , mnożymy to przez indeks iteracji - ile razy agregacja został wykonany.

Aby to zrobić, potrzebujemy sposobu na śledzenie indeksu iteracji, który jest osiągany dzięki koncepcji stanu. W initialize_fn , zamiast tworzenia pustego stanu, możemy zainicjować stan zerowy być skalarne. Następnie stan może być stosowany w next_fn na trzy etapy: (1) przez przyrostowe 1.0 , (2) stosowanie do mnożenia value , (3), obie jako nowy stan aktualizowane.

Gdy to nastąpi, można zauważyć: Ale dokładnie tak samo jak powyżej kod może być użyty do sprawdzenia wszystko działa zgodnie z oczekiwaniami. Skąd mam wiedzieć, że coś się rzeczywiście zmieniło?

Dobre pytanie! Tu przydaje się koncepcja pomiarów. W ogóle, pomiary mogą zgłaszać dowolną wartość odpowiednią do pojedynczego wykonania next czynności, które mogłyby być użyte do monitorowania. W tym przypadku może to być summed_value z poprzedniego przykładu. To znaczy wartość przed krokiem „unskalowania”, która powinna zależeć od indeksu iteracji. Ponownie, niekoniecznie jest to przydatne w praktyce, ale ilustruje odpowiedni mechanizm.

Stanowa odpowiedź na zadanie wygląda więc następująco:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Należy zauważyć, że state , który przychodzi do next_fn jako wejście znajduje się na serwerze. W celu wykorzystania go do klientów, to najpierw musi zostać przekazana, co uzyskuje się stosując tff.federated_broadcast operatora.

Aby zweryfikować wszystkie prace zgodnie z oczekiwaniami, możemy teraz spojrzeć na raportowanych measurements , które powinny być różne w każdej rundzie wykonania, nawet jeśli prowadzony z tej samej client_data .

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

Typy strukturalne

Wagi modelu uczonego w uczeniu federacyjnym są zwykle reprezentowane jako zbiór tensorów, a nie pojedynczy tensor. W TFF, jest to reprezentowane tff.StructType i ogólnie użyteczne fabryki agregacji muszą być w stanie zaakceptować strukturyzowanych typy.

Jednak w powyższych przykładach, tylko pracowaliśmy z tff.TensorType obiektu. Jeśli staramy się korzystać z poprzedniej fabryki stworzyć proces agregacji z tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) , otrzymujemy dziwny błąd, ponieważ TensorFlow spróbuje pomnożyć tf.Tensor i list .

Problem polega na tym, że zamiast mnożenia strukturę tensorów przez stałą, musimy pomnożyć każdy tensor w strukturze przez stałą. Typowym rozwiązaniem tego problemu jest zastosowanie tf.nest moduł wnętrza utworzonych tff.tf_computation s.

Wersja poprzedniego ExampleTaskFactory kompatybilnym z strukturyzowanych typów wygląda więc następująco:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Ten przykład podkreśla wzorzec, który może być przydatny podczas konstruowania kodu TFF. Kiedy nie czynienia z bardzo prostych operacji, kod staje się bardziej czytelne, gdy tff.tf_computation s, które zostaną użyte jako klocki wewnątrz tff.federated_computation tworzone są w osobnym miejscu. Wewnątrz tff.federated_computation te klocki są połączone tylko za pomocą swoistych operatorów.

Aby sprawdzić, czy działa zgodnie z oczekiwaniami:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

Agregacje wewnętrzne

Ostatnim krokiem jest opcjonalne umożliwienie delegowania rzeczywistej agregacji do innych fabryk, aby umożliwić łatwe tworzenie różnych technik agregacji.

Osiąga się to poprzez stworzenie dodatkowego inner_factory argument konstruktora naszej ExampleTaskFactory . Jeśli nie podano, tff.aggregators.SumFactory jest używany, który stosuje tff.federated_sum operatora używany bezpośrednio w poprzedniej części.

Kiedy create nazywa, możemy najpierw zadzwonić create z inner_factory stworzyć wewnętrzny proces agregacji z tego samego value_type .

Stan naszego procesu zwrócony przez initialize_fn jest połączenie dwóch części: w stanie utworzonym przez „to” procesu i stanu właśnie utworzonego procesu wewnętrznego.

Realizacja next_fn różni się tym rzeczywistej agregacji jest delegowana do next funkcji procesu wewnętrznego, w jaki sposób ostateczny skład wyjściowy. Stan ponownie składa się z „to” i „wewnętrzny” stanu, a pomiary są zbudowane w sposób podobny jak w OrderedDict .

Poniżej znajduje się implementacja takiego wzorca.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Po przekazaniu do inner_process.next funkcji, struktury zamian otrzymujemy jest tff.templates.MeasuredProcessOutput , z tych samych trzech dziedzinach - state , result i measurements . Tworząc ogólną strukturę powrotny składa się proces agregacji, że state i measurements pola powinny być na ogół składa się ze sobą i zwrócone. Natomiast result odpowiada pole do wartości są sumowane i zamiast „płynie” w złożonej agregacji.

state obiektu powinny być postrzegane jako szczegółach realizacji fabryki, a tym samym skład może być dowolnej strukturze. Jednak measurements odpowiadają wartości należy podać użytkownikowi w pewnym momencie. Dlatego zalecamy stosowanie OrderedDict , składa się z nazw, tak więc powinno być jasne, gdzie w kompozycji jest zgłaszane metryczny pochodzi.

Należy również zwrócić uwagę na wykorzystanie tff.federated_zip operatora. state obiektu contolled stworzonej przez proces powinien być tff.FederatedType . Gdybyśmy zamiast powrócił (this_state, inner_state) w initialize_fn , jego podpis typ zwracany byłby tff.StructType zawierający 2-krotka tff.FederatedType s. Zastosowanie tff.federated_zip „windy” z tff.FederatedType do poziomu górnego. Podobnie jest stosowany w next_fn przy wytwarzaniu stanu i pomiary zostały zwrócone.

Na koniec możemy zobaczyć, jak można to wykorzystać z domyślną agregacją wewnętrzną:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... i z inną wewnętrzną agregacją. Na przykład, ExampleTaskFactory :

client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

Streszczenie

W tym samouczku wyjaśniliśmy najlepsze praktyki, których należy przestrzegać, aby utworzyć blok konstrukcyjny agregacji ogólnego przeznaczenia, reprezentowany jako fabryka agregacji. Ogólność wynika z intencji projektowych na dwa sposoby:

  1. Obliczenia parametryczne. Agregacja jest niezależny blok budynek, który można podłączyć do innych modułów TFF zaprojektowany do pracy z tff.aggregators parameterize ich niezbędnego agregacji, takich jak tff.learning.build_federated_averaging_process .
  2. Skład agregacyjny. Blok budulcowy agregacji można skomponować z innymi blokami budulcowymi agregacji w celu utworzenia bardziej złożonych agregacji złożonych.