사용자 정의 집계 구현

TensorFlow.org에서 보기 Google Colab에서 실행 GitHub에서 소스 보기 노트북 다운로드

이 튜토리얼에서, 우리는 디자인 뒤에서 원리를 설명 tff.aggregators 모듈과 클라이언트에서 서버로 값을 사용자 정의 집계를 구현하기위한 최상의 방법을.

전제 조건. 이 튜토리얼은 당신이 이미의 기본 개념을 잘 알고있는 가정 연합 코어 등의 게재 위치에 (로 tff.SERVER , tff.CLIENTS TFF는 계산을 표현하는 방법), ( tff.tf_computation , tff.federated_computation )과 유형 서명.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

디자인 요약

TFF에서 "응집"에 대한 값들의 세트의 이동을 지칭 tff.CLIENTS 동일한 유형의 집계 값을 생성하기 tff.SERVER . 즉, 각각의 개별 클라이언트 값을 사용할 수 있을 필요는 없습니다. 예를 들어 연합 학습에서 클라이언트 모델 업데이트는 평균을 내어 서버의 전역 모델에 적용할 집계 모델 업데이트를 얻습니다.

이 같은 목적을 달성 연산자 이외에도 tff.federated_sum , TFF가 제공 tff.templates.AggregationProcess (a 안정된 공정 이 단순한 합보다 더 복잡한 형태를 일반화 할 수 있도록 집합 연산의 형식 서명을 공식화).

의 주성분 tff.aggregators 의 제작을위한 모듈이다 공장 AggregationProcess 개의 TFF 측면에서 일반적으로 유용하고 대체 가능 빌딩 블록으로 설계된다 :

  1. 매개변수화된 계산. 집계와 함께 작동하도록 설계된 다른 TFF 모듈에 연결 될 수있는 독립적 인 빌딩 블록 tff.aggregators 그들의 필요한 집계를 파라미터 화.

예시:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. 집계 구성. 집계 빌딩 블록은 다른 집계 빌딩 블록과 함께 구성되어 더 복잡한 복합 집계를 생성할 수 있습니다.

예시:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

이 튜토리얼의 나머지 부분에서는 이 두 가지 목표를 달성하는 방법을 설명합니다.

집계 프로세스

우리는 먼저 요약 tff.templates.AggregationProcess , 그 창작 팩토리 패턴을 따릅니다.

tff.templates.AggregationProcess 입니다 tff.templates.MeasuredProcess 집계 지정된 유형의 서명. 특히, initializenext 기능은 다음과 같은 유형의 서명이 :

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

(유형의 상태 state_type ) 서버에 위치해야합니다. next 함수는 상태 값 (A 타입의 집계에 입력 인수로서 얻어 value_type 클라이언트에 위치). * 수단은 가중 평균의 인스턴스 무게를 들어, 다른 입력 인수를 선택. 업데이트된 상태 개체, 서버에 배치된 동일한 유형의 집계 값 및 일부 측정값을 반환합니다.

국가 모두의 실행 사이에 전달되는 것을 참고 next 기능,보고 된 측정이 특정 실행에 따라 정보보고하도록 next 기능을 비어있을 수 있습니다. 그럼에도 불구하고 TFF의 다른 부분이 따라야 할 명확한 계약을 갖도록 명시적으로 지정해야 합니다.

다른 TFF 모듈은, 인스턴스의 모델 업데이트 tff.learning , 사용하는 것으로 예상된다 tff.templates.AggregationProcess 값을 집계하는 방법을 매개 변수화 할 수 있습니다. 그러나 집계된 값과 유형 서명이 정확히 무엇인지는 훈련되는 모델의 기타 세부 사항과 이를 수행하는 데 사용되는 학습 알고리즘에 따라 다릅니다.

계산의 다른 측면의 집계 독립을 위해, 우리는 공장 패턴을 사용 - 우리가 적절한 만들 tff.templates.AggregationProcess 객체의 관련 유형 서명이 (가) 호출하여, 집계 가능 할 한 번 create 공장 방법. 따라서 집계 프로세스의 직접 처리는 이 생성을 담당하는 라이브러리 작성자에게만 필요합니다.

집계 프로세스 공장

가중치가 없는 집계 및 가중치 집계를 위한 두 가지 추상 기본 팩토리 클래스가 있습니다. 그들의 create 방법은 집계 할 값의 유형 서명을 취하고 반환 tff.templates.AggregationProcess 같은 값의 집계.

에 의해 생성 된 프로세스 tff.aggregators.UnweightedAggregationFactory 서버에서 (1) 상태 및 명시된 형 (2)의 값을 두 개의 입력 인자 얻어 value_type .

예를 구현이다 tff.aggregators.SumFactory .

에 의해 생성 된 프로세스 tff.aggregators.WeightedAggregationFactory 서버에서 (1) 상태를 지정 형 (2) 값 : 3 개 개의 입력 인자 얻어 value_type 및 형식 (3) 중량 weight_type 공장의 사용자 지정에 따라 호출 할 때, create 방법.

예시적인 구현이다 tff.aggregators.MeanFactory 가중 평균을 계산한다.

팩토리 패턴은 위에서 언급한 첫 번째 목표를 달성하는 방법입니다. 그 집계는 독립적인 빌딩 블록입니다. 예를 들어 훈련 가능한 모델 변수를 변경할 때 복잡한 집계를 변경할 필요가 없습니다. 같은 방법으로 사용하는 경우를 나타내는 공장은 다른 유형의 서명으로 호출됩니다 tff.learning.build_federated_averaging_process .

작곡

일반 집계 프로세스는 (a) 클라이언트에서 값의 일부 전처리, (b) 클라이언트에서 서버로 값 이동, (c) 서버에서 집계된 값의 일부 후처리를 캡슐화할 수 있음을 상기하십시오. 공진 영역 응집 조성물, 상술 한 두 번째 골이 실현된다 tff.aggregators 구조에 의해 부품 (b)이 서로 응집 공장에 위임 될 수있는 응집 공장 구현 모듈을 포함한다.

단일 팩토리 클래스 내에서 필요한 모든 로직을 구현하는 대신 구현은 기본적으로 집계와 관련된 단일 측면에 중점을 둡니다. 필요한 경우 이 패턴을 통해 빌딩 블록을 한 번에 하나씩 교체할 수 있습니다.

예는 가중 tff.aggregators.MeanFactory . 그 구현은 클라이언트에서 제공된 값과 가중치를 곱한 다음 가중치와 가중치를 개별적으로 합산한 다음 가중치 합을 서버에서 가중치 합으로 나눕니다. 대신에 직접 사용하여 합산 구현 tff.federated_sum 연산자를 합산 두 인스턴스에 위임 tff.aggregators.SumFactory .

이러한 구조를 통해 두 개의 기본 합계가 합계를 다르게 실현하는 다른 팩토리로 대체될 수 있습니다. 예를 들어, tff.aggregators.SecureSumFactory , 또는의 사용자 지정 구현 tff.aggregators.UnweightedAggregationFactory . 반대로, 시간, tff.aggregators.MeanFactory 자체 등 다른 공장의 내부 응집 될 수 tff.aggregators.clipping_factory 값을 평균화하기 전에 클립 할 경우.

이전 참조 튜닝 학습에 대한 집계를 권장 에서 기존 공장을 사용하여 구성 메커니즘의 receommended 사용에 대한 자습서를 tff.aggregators 모듈.

모범 사례를 통한 모범 사례

우리는 설명하려고 tff.aggregators 간단한 예제 작업을 구현하여 상세하게 개념을하고 점진적으로 더 일반적인합니다. 또 다른 학습 방법은 기존 공장의 구현을 살펴보는 것입니다.

import collections
import tensorflow as tf
import tensorflow_federated as tff

대신 합산 value , 예 태스크 합산하는 value * 2.0 다음으로 합계 분할 2.0 . 집계 결과를 합산 따라서 직접 수학적으로 등가 인 value 과 세 부분으로 이루어진 것으로 생각할 수있다 : 서버에 unscaling 클라이언트 (3)에 걸쳐 합산 클라이언트 (1) 스케일링 (2).

디자인은 위에서 설명한 다음, 논리의 서브 클래스로 구현됩니다 tff.aggregators.UnweightedAggregationFactory 적절한 생성, tff.templates.AggregationProcess 주어진 value_type 집계로를 :

최소한의 구현

예제 작업의 경우 필요한 계산은 항상 동일하므로 상태를 사용할 필요가 없습니다. 그것은 따라서 비우, 그리고으로 표시됩니다 tff.federated_value((), tff.SERVER) . 현재로서는 측정에 대해서도 마찬가지입니다.

따라서 작업의 최소 구현은 다음과 같습니다.

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

모든 것이 예상대로 작동하는지 여부는 다음 코드로 확인할 수 있습니다.

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

상태 유지 및 측정

상태 저장은 반복적으로 실행되고 각 반복마다 변경될 것으로 예상되는 계산을 나타내기 위해 TFF에서 광범위하게 사용됩니다. 예를 들어 학습 계산 상태에는 학습 중인 모델의 가중치가 포함됩니다.

집계 계산에서 상태를 사용하는 방법을 설명하기 위해 예제 작업을 수정합니다. 대신 곱의 value2.0 , 우리는 곱셈이 반복 인덱스 - 횟수 집계가 실행되었습니다.

그렇게 하려면 상태 개념을 통해 달성되는 반복 인덱스를 추적하는 방법이 필요합니다. 에서 initialize_fn 대신 비어있는 상태를 만드는, 우리는 스칼라 제로로 상태를 초기화합니다. 그리고,이 상태에서 사용할 수 next_fn 1로 증분 세 단계 1.0 곱하기 (2)를 사용하여 value , 새로운 갱신 된 상태 (3) 창.

이 작업이 완료되면, 당신은주의 수도 :하지만 정확히 위와 같은 코드는 예상대로 모든 일을 확인하는 데 사용할 수 있습니다. 실제로 변경된 사항이 있는지 어떻게 알 수 있습니까?

좋은 질문! 여기에서 측정 개념이 유용해집니다. 일반적으로, 측정은 단일 실행에 관련된 값보고 할 수 있습니다 next 모니터링에 사용할 수있는 기능. 이 경우, 될 수 summed_value 앞의 예에서. 즉, "unscaling" 단계 이전의 값으로, 반복 인덱스에 따라 달라집니다. 다시 말하지만, 이것은 실제로 반드시 유용하지는 않지만 관련 메커니즘을 보여줍니다.

따라서 작업에 대한 상태 저장 답변은 다음과 같습니다.

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

있습니다 state 로 제공 next_fn 입력으로 서버에 배치됩니다. 클라이언트에서 사용하기 위해서는, 먼저 사용하여 달성되는, 통신 될 필요 tff.federated_broadcast 오퍼레이터.

예상대로 모든 일을 확인하기 위해, 우리가 지금보고에서 볼 수 measurements , 실행의 각 라운드와 달라야합니다, 경우에도 같은과 실행 client_data .

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

구조화된 유형

연합 학습에서 훈련된 모델의 모델 가중치는 일반적으로 단일 텐서가 아닌 텐서 모음으로 표시됩니다. TFF, 이것은로 표시됩니다 tff.StructType 일반적으로 유용한 집계 공장은 구조화 된 유형을 수용 할 수 있어야합니다.

그러나, 위의 예에서, 우리는 단지 함께 일 tff.TensorType 객체입니다. 우리가 함께 집계 프로세스를 생성하기 이전 공장을 사용하려고하면 tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) , 우리는 이상한 오류로 인해 얻을 TensorFlow는 번식을 시도합니다 tf.Tensorlist .

문제는 아닌 상수의 구조 텐서 곱, 우리는 상수 구조의 각 텐서 곱해야한다는 것이다. 이 문제에 대한 일반적인 해결책은 사용하는 tf.nest 생성 모듈의 내부 tff.tf_computation 들.

이전의 버전 ExampleTaskFactory 다음과 같이 구조화 된 유형과 호환 이렇게 본다 :

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

이 예제는 TFF 코드를 구성할 때 따라야 할 유용한 패턴을 강조합니다. 매우 간단한 작업을 처리하지 않을 경우 경우, 코드는보다 읽기하게 tff.tf_computation , 안쪽 블록 건물로 사용됩니다의 tff.federated_computation 별도의 장소에 생성됩니다. 내부의 tff.federated_computation ,이 빌딩 블록은 고유 연산자를 사용하여 연결되어 있습니다.

예상대로 작동하는지 확인하려면:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

내부 집계

마지막 단계는 다른 집계 기술을 쉽게 구성할 수 있도록 선택적으로 실제 집계를 다른 팩토리에 위임할 수 있도록 하는 것입니다.

이 옵션으로 생성함으로써 달성 inner_factory 우리의 생성자에 인수 ExampleTaskFactory . 지정되지 않으면, tff.aggregators.SumFactory 적용되는 사용 tff.federated_sum 이전 섹션에서 직접 사용할 연산자.

create 라고, 우리는 먼저 호출 할 수 있습니다 createinner_factory 같은과 내부 통합 프로세스를 만들 수 value_type .

에 의해 반환 프로세스의 상태 initialize_fn "이"프로세스에 의해 생성 된 상태, 및 방금 생성 프로세스 내부의 상태의 두 부분의 조성물이다.

의 구현 next_fn 그 실제 응집 상이는 위임되어 next 내부 처리의 기능, 최종 출력이 이루어지는 방법이다. 상태는 다시 "이"및 "내측"상태로 구성되고, 측정은와 유사한 방식으로 구성된다 OrderedDict .

다음은 그러한 패턴의 구현입니다.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

받는 사람 위임 할 때 inner_process.next 기능, 우리가 얻을 수익 구조는이다 tff.templates.MeasuredProcessOutput - 같은 세 개의 필드, state , resultmeasurements . 합성 응집 공정의 전체 창 구조를 작성하면, statemeasurements 분야는 일반적으로 구성과 함께 반환한다. 대조적으로, result 값은 대응 필드 응집되는 대신 이루어지는 응집 "흐른다".

state 객체는 공장의 구현 세부 사항으로 간주되어야하고, 따라서 조성물은 임의의 구조 일 수있다. 그러나 measurements 값이 어떤 점에서 사용자에게보고 할에 해당합니다. 따라서, 우리는 사용에 추천 OrderedDict 성분의 출처 메트릭보고 않는 경우 분명히있을 것이라고 같은 명명하여 구성된으로.

또한 사용합니다 tff.federated_zip 연산자를. state 생성 프로세스 contolled 목적은 있어야 tff.FederatedType . 우리가 대신 반환 한 경우 (this_state, inner_state)initialize_fn , 그것의 반환 유형 서명은 것 tff.StructType 의 2 튜플 포함 tff.FederatedType 들. 의 사용 tff.federated_zip "리프트"를 tff.FederatedType 최고 수준. 이는 유사하게 사용된다 next_fn 상태와 측정이 반환 될 준비 할 때.

마지막으로 이것이 기본 내부 집계와 함께 사용되는 방법을 볼 수 있습니다.

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... 내부 집계가 다릅니다. 예를 들어, ExampleTaskFactory :

client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

요약

이 자습서에서는 집계 팩토리로 표시되는 범용 집계 빌딩 블록을 만들기 위해 따라야 할 모범 사례를 설명했습니다. 일반성은 다음 두 가지 방식으로 설계 의도를 통해 나타납니다.

  1. 매개변수화된 계산. 집계와 함께 작동하도록 설계된 다른 TFF 모듈에 연결 될 수있는 독립적 인 빌딩 블록 tff.aggregators 같은 그들의 필요한 응집 매개 변수화하기 tff.learning.build_federated_averaging_process .
  2. 집계 구성. 집계 빌딩 블록은 다른 집계 빌딩 블록과 함께 구성되어 더 복잡한 복합 집계를 생성할 수 있습니다.