TensorFlow.org에서 보기 | Google Colab에서 실행 | GitHub에서 소스 보기 | 노트북 다운로드 |
이 튜토리얼에서, 우리는 디자인 뒤에서 원리를 설명 tff.aggregators
모듈과 클라이언트에서 서버로 값을 사용자 정의 집계를 구현하기위한 최상의 방법을.
전제 조건. 이 튜토리얼은 당신이 이미의 기본 개념을 잘 알고있는 가정 연합 코어 등의 게재 위치에 (로 tff.SERVER
, tff.CLIENTS
TFF는 계산을 표현하는 방법), ( tff.tf_computation
, tff.federated_computation
)과 유형 서명.
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
디자인 요약
TFF에서 "응집"에 대한 값들의 세트의 이동을 지칭 tff.CLIENTS
동일한 유형의 집계 값을 생성하기 tff.SERVER
. 즉, 각각의 개별 클라이언트 값을 사용할 수 있을 필요는 없습니다. 예를 들어 연합 학습에서 클라이언트 모델 업데이트는 평균을 내어 서버의 전역 모델에 적용할 집계 모델 업데이트를 얻습니다.
이 같은 목적을 달성 연산자 이외에도 tff.federated_sum
, TFF가 제공 tff.templates.AggregationProcess
(a 안정된 공정 이 단순한 합보다 더 복잡한 형태를 일반화 할 수 있도록 집합 연산의 형식 서명을 공식화).
의 주성분 tff.aggregators
의 제작을위한 모듈이다 공장 AggregationProcess
개의 TFF 측면에서 일반적으로 유용하고 대체 가능 빌딩 블록으로 설계된다 :
- 매개변수화된 계산. 집계와 함께 작동하도록 설계된 다른 TFF 모듈에 연결 될 수있는 독립적 인 빌딩 블록
tff.aggregators
그들의 필요한 집계를 파라미터 화.
예시:
learning_process = tff.learning.build_federated_averaging_process(
...,
model_update_aggregation_factory=tff.aggregators.MeanFactory())
- 집계 구성. 집계 빌딩 블록은 다른 집계 빌딩 블록과 함께 구성되어 더 복잡한 복합 집계를 생성할 수 있습니다.
예시:
secure_mean = tff.aggregators.MeanFactory(
value_sum_factory=tff.aggregators.SecureSumFactory(...))
이 튜토리얼의 나머지 부분에서는 이 두 가지 목표를 달성하는 방법을 설명합니다.
집계 프로세스
우리는 먼저 요약 tff.templates.AggregationProcess
, 그 창작 팩토리 패턴을 따릅니다.
tff.templates.AggregationProcess
입니다 tff.templates.MeasuredProcess
집계 지정된 유형의 서명. 특히, initialize
및 next
기능은 다음과 같은 유형의 서명이 :
-
( -> state_type@SERVER)
-
(<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)
(유형의 상태 state_type
) 서버에 위치해야합니다. next
함수는 상태 값 (A 타입의 집계에 입력 인수로서 얻어 value_type
클라이언트에 위치). *
수단은 가중 평균의 인스턴스 무게를 들어, 다른 입력 인수를 선택. 업데이트된 상태 개체, 서버에 배치된 동일한 유형의 집계 값 및 일부 측정값을 반환합니다.
국가 모두의 실행 사이에 전달되는 것을 참고 next
기능,보고 된 측정이 특정 실행에 따라 정보보고하도록 next
기능을 비어있을 수 있습니다. 그럼에도 불구하고 TFF의 다른 부분이 따라야 할 명확한 계약을 갖도록 명시적으로 지정해야 합니다.
다른 TFF 모듈은, 인스턴스의 모델 업데이트 tff.learning
, 사용하는 것으로 예상된다 tff.templates.AggregationProcess
값을 집계하는 방법을 매개 변수화 할 수 있습니다. 그러나 집계된 값과 유형 서명이 정확히 무엇인지는 훈련되는 모델의 기타 세부 사항과 이를 수행하는 데 사용되는 학습 알고리즘에 따라 다릅니다.
계산의 다른 측면의 집계 독립을 위해, 우리는 공장 패턴을 사용 - 우리가 적절한 만들 tff.templates.AggregationProcess
객체의 관련 유형 서명이 (가) 호출하여, 집계 가능 할 한 번 create
공장 방법. 따라서 집계 프로세스의 직접 처리는 이 생성을 담당하는 라이브러리 작성자에게만 필요합니다.
집계 프로세스 공장
가중치가 없는 집계 및 가중치 집계를 위한 두 가지 추상 기본 팩토리 클래스가 있습니다. 그들의 create
방법은 집계 할 값의 유형 서명을 취하고 반환 tff.templates.AggregationProcess
같은 값의 집계.
에 의해 생성 된 프로세스 tff.aggregators.UnweightedAggregationFactory
서버에서 (1) 상태 및 명시된 형 (2)의 값을 두 개의 입력 인자 얻어 value_type
.
예를 구현이다 tff.aggregators.SumFactory
.
에 의해 생성 된 프로세스 tff.aggregators.WeightedAggregationFactory
서버에서 (1) 상태를 지정 형 (2) 값 : 3 개 개의 입력 인자 얻어 value_type
및 형식 (3) 중량 weight_type
공장의 사용자 지정에 따라 호출 할 때, create
방법.
예시적인 구현이다 tff.aggregators.MeanFactory
가중 평균을 계산한다.
팩토리 패턴은 위에서 언급한 첫 번째 목표를 달성하는 방법입니다. 그 집계는 독립적인 빌딩 블록입니다. 예를 들어 훈련 가능한 모델 변수를 변경할 때 복잡한 집계를 변경할 필요가 없습니다. 같은 방법으로 사용하는 경우를 나타내는 공장은 다른 유형의 서명으로 호출됩니다 tff.learning.build_federated_averaging_process
.
작곡
일반 집계 프로세스는 (a) 클라이언트에서 값의 일부 전처리, (b) 클라이언트에서 서버로 값 이동, (c) 서버에서 집계된 값의 일부 후처리를 캡슐화할 수 있음을 상기하십시오. 공진 영역 응집 조성물, 상술 한 두 번째 골이 실현된다 tff.aggregators
구조에 의해 부품 (b)이 서로 응집 공장에 위임 될 수있는 응집 공장 구현 모듈을 포함한다.
단일 팩토리 클래스 내에서 필요한 모든 로직을 구현하는 대신 구현은 기본적으로 집계와 관련된 단일 측면에 중점을 둡니다. 필요한 경우 이 패턴을 통해 빌딩 블록을 한 번에 하나씩 교체할 수 있습니다.
예는 가중 tff.aggregators.MeanFactory
. 그 구현은 클라이언트에서 제공된 값과 가중치를 곱한 다음 가중치와 가중치를 개별적으로 합산한 다음 가중치 합을 서버에서 가중치 합으로 나눕니다. 대신에 직접 사용하여 합산 구현 tff.federated_sum
연산자를 합산 두 인스턴스에 위임 tff.aggregators.SumFactory
.
이러한 구조를 통해 두 개의 기본 합계가 합계를 다르게 실현하는 다른 팩토리로 대체될 수 있습니다. 예를 들어, tff.aggregators.SecureSumFactory
, 또는의 사용자 지정 구현 tff.aggregators.UnweightedAggregationFactory
. 반대로, 시간, tff.aggregators.MeanFactory
자체 등 다른 공장의 내부 응집 될 수 tff.aggregators.clipping_factory
값을 평균화하기 전에 클립 할 경우.
이전 참조 튜닝 학습에 대한 집계를 권장 에서 기존 공장을 사용하여 구성 메커니즘의 receommended 사용에 대한 자습서를 tff.aggregators
모듈.
모범 사례를 통한 모범 사례
우리는 설명하려고 tff.aggregators
간단한 예제 작업을 구현하여 상세하게 개념을하고 점진적으로 더 일반적인합니다. 또 다른 학습 방법은 기존 공장의 구현을 살펴보는 것입니다.
import collections
import tensorflow as tf
import tensorflow_federated as tff
대신 합산 value
, 예 태스크 합산하는 value * 2.0
다음으로 합계 분할 2.0
. 집계 결과를 합산 따라서 직접 수학적으로 등가 인 value
과 세 부분으로 이루어진 것으로 생각할 수있다 : 서버에 unscaling 클라이언트 (3)에 걸쳐 합산 클라이언트 (1) 스케일링 (2).
디자인은 위에서 설명한 다음, 논리의 서브 클래스로 구현됩니다 tff.aggregators.UnweightedAggregationFactory
적절한 생성, tff.templates.AggregationProcess
주어진 value_type
집계로를 :
최소한의 구현
예제 작업의 경우 필요한 계산은 항상 동일하므로 상태를 사용할 필요가 없습니다. 그것은 따라서 비우, 그리고으로 표시됩니다 tff.federated_value((), tff.SERVER)
. 현재로서는 측정에 대해서도 마찬가지입니다.
따라서 작업의 최소 구현은 다음과 같습니다.
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value((), tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
scaled_value = tff.federated_map(
tff.tf_computation(lambda x: x * 2.0), value)
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x: x / 2.0), summed_value)
measurements = tff.federated_value((), tff.SERVER)
return tff.templates.MeasuredProcessOutput(
state=state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
모든 것이 예상대로 작동하는지 여부는 다음 코드로 확인할 수 있습니다.
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result} (expected 8.0)')
Type signatures of the created aggregation process: - initialize: ( -> <>@SERVER) - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>) Aggregation result: 8.0 (expected 8.0)
상태 유지 및 측정
상태 저장은 반복적으로 실행되고 각 반복마다 변경될 것으로 예상되는 계산을 나타내기 위해 TFF에서 광범위하게 사용됩니다. 예를 들어 학습 계산 상태에는 학습 중인 모델의 가중치가 포함됩니다.
집계 계산에서 상태를 사용하는 방법을 설명하기 위해 예제 작업을 수정합니다. 대신 곱의 value
가 2.0
, 우리는 곱셈이 반복 인덱스 - 횟수 집계가 실행되었습니다.
그렇게 하려면 상태 개념을 통해 달성되는 반복 인덱스를 추적하는 방법이 필요합니다. 에서 initialize_fn
대신 비어있는 상태를 만드는, 우리는 스칼라 제로로 상태를 초기화합니다. 그리고,이 상태에서 사용할 수 next_fn
1로 증분 세 단계 1.0
곱하기 (2)를 사용하여 value
, 새로운 갱신 된 상태 (3) 창.
이 작업이 완료되면, 당신은주의 수도 :하지만 정확히 위와 같은 코드는 예상대로 모든 일을 확인하는 데 사용할 수 있습니다. 실제로 변경된 사항이 있는지 어떻게 알 수 있습니까?
좋은 질문! 여기에서 측정 개념이 유용해집니다. 일반적으로, 측정은 단일 실행에 관련된 값보고 할 수 있습니다 next
모니터링에 사용할 수있는 기능. 이 경우, 될 수 summed_value
앞의 예에서. 즉, "unscaling" 단계 이전의 값으로, 반복 인덱스에 따라 달라집니다. 다시 말하지만, 이것은 실제로 반드시 유용하지는 않지만 관련 메커니즘을 보여줍니다.
따라서 작업에 대한 상태 저장 답변은 다음과 같습니다.
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(
tff.tf_computation(lambda x: x + 1.0), state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
있습니다 state
로 제공 next_fn
입력으로 서버에 배치됩니다. 클라이언트에서 사용하기 위해서는, 먼저 사용하여 달성되는, 통신 될 필요 tff.federated_broadcast
오퍼레이터.
예상대로 모든 일을 확인하기 위해, 우리가 지금보고에서 볼 수 measurements
, 실행의 각 라운드와 달라야합니다, 경우에도 같은과 실행 client_data
.
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 1)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 2)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 3)')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>) | Round #1 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 8.0 (expected 8.0 * 1) | Round #2 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 16.0 (expected 8.0 * 2) | Round #3 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 24.0 (expected 8.0 * 3)
구조화된 유형
연합 학습에서 훈련된 모델의 모델 가중치는 일반적으로 단일 텐서가 아닌 텐서 모음으로 표시됩니다. TFF, 이것은로 표시됩니다 tff.StructType
일반적으로 유용한 집계 공장은 구조화 된 유형을 수용 할 수 있어야합니다.
그러나, 위의 예에서, 우리는 단지 함께 일 tff.TensorType
객체입니다. 우리가 함께 집계 프로세스를 생성하기 이전 공장을 사용하려고하면 tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))])
, 우리는 이상한 오류로 인해 얻을 TensorFlow는 번식을 시도합니다 tf.Tensor
과 list
.
문제는 아닌 상수의 구조 텐서 곱, 우리는 상수 구조의 각 텐서 곱해야한다는 것이다. 이 문제에 대한 일반적인 해결책은 사용하는 tf.nest
생성 모듈의 내부 tff.tf_computation
들.
이전의 버전 ExampleTaskFactory
다음과 같이 구조화 된 유형과 호환 이렇게 본다 :
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(add_one, state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(scale, (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
이 예제는 TFF 코드를 구성할 때 따라야 할 유용한 패턴을 강조합니다. 매우 간단한 작업을 처리하지 않을 경우 경우, 코드는보다 읽기하게 tff.tf_computation
, 안쪽 블록 건물로 사용됩니다의 tff.federated_computation
별도의 장소에 생성됩니다. 내부의 tff.federated_computation
,이 빌딩 블록은 고유 연산자를 사용하여 연결되어 있습니다.
예상대로 작동하는지 확인하려면:
client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
[[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
f' Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>) Aggregation result: [[2. 3.], [6. 4. 0.]] Expected: [[2. 3.], [6. 4. 0.]]
내부 집계
마지막 단계는 다른 집계 기술을 쉽게 구성할 수 있도록 선택적으로 실제 집계를 다른 팩토리에 위임할 수 있도록 하는 것입니다.
이 옵션으로 생성함으로써 달성 inner_factory
우리의 생성자에 인수 ExampleTaskFactory
. 지정되지 않으면, tff.aggregators.SumFactory
적용되는 사용 tff.federated_sum
이전 섹션에서 직접 사용할 연산자.
때 create
라고, 우리는 먼저 호출 할 수 있습니다 create
의 inner_factory
같은과 내부 통합 프로세스를 만들 수 value_type
.
에 의해 반환 프로세스의 상태 initialize_fn
"이"프로세스에 의해 생성 된 상태, 및 방금 생성 프로세스 내부의 상태의 두 부분의 조성물이다.
의 구현 next_fn
그 실제 응집 상이는 위임되어 next
내부 처리의 기능, 최종 출력이 이루어지는 방법이다. 상태는 다시 "이"및 "내측"상태로 구성되고, 측정은와 유사한 방식으로 구성된다 OrderedDict
.
다음은 그러한 패턴의 구현입니다.
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def __init__(self, inner_factory=None):
if inner_factory is None:
inner_factory = tff.aggregators.SumFactory()
self._inner_factory = inner_factory
def create(self, value_type):
inner_process = self._inner_factory.create(value_type)
@tff.federated_computation()
def initialize_fn():
my_state = tff.federated_value(0.0, tff.SERVER)
inner_state = inner_process.initialize()
return tff.federated_zip((my_state, inner_state))
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
my_state, inner_state = state
my_new_state = tff.federated_map(add_one, my_state)
my_state_at_clients = tff.federated_broadcast(my_new_state)
scaled_value = tff.federated_map(scale, (value, my_state_at_clients))
# Delegation to an inner factory, returning values placed at SERVER.
inner_output = inner_process.next(inner_state, scaled_value)
unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))
new_state = tff.federated_zip((my_new_state, inner_output.state))
measurements = tff.federated_zip(
collections.OrderedDict(
scaled_value=inner_output.result,
example_task=inner_output.measurements))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
받는 사람 위임 할 때 inner_process.next
기능, 우리가 얻을 수익 구조는이다 tff.templates.MeasuredProcessOutput
- 같은 세 개의 필드, state
, result
및 measurements
. 합성 응집 공정의 전체 창 구조를 작성하면, state
및 measurements
분야는 일반적으로 구성과 함께 반환한다. 대조적으로, result
값은 대응 필드 응집되는 대신 이루어지는 응집 "흐른다".
state
객체는 공장의 구현 세부 사항으로 간주되어야하고, 따라서 조성물은 임의의 구조 일 수있다. 그러나 measurements
값이 어떤 점에서 사용자에게보고 할에 해당합니다. 따라서, 우리는 사용에 추천 OrderedDict
성분의 출처 메트릭보고 않는 경우 분명히있을 것이라고 같은 명명하여 구성된으로.
또한 사용합니다 tff.federated_zip
연산자를. state
생성 프로세스 contolled 목적은 있어야 tff.FederatedType
. 우리가 대신 반환 한 경우 (this_state, inner_state)
에 initialize_fn
, 그것의 반환 유형 서명은 것 tff.StructType
의 2 튜플 포함 tff.FederatedType
들. 의 사용 tff.federated_zip
"리프트"를 tff.FederatedType
최고 수준. 이는 유사하게 사용된다 next_fn
상태와 측정이 반환 될 준비 할 때.
마지막으로 이것이 기본 내부 집계와 함께 사용되는 방법을 볼 수 있습니다.
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: () | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: ()
... 내부 집계가 다릅니다. 예를 들어, ExampleTaskFactory
:
client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())]) | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])
요약
이 자습서에서는 집계 팩토리로 표시되는 범용 집계 빌딩 블록을 만들기 위해 따라야 할 모범 사례를 설명했습니다. 일반성은 다음 두 가지 방식으로 설계 의도를 통해 나타납니다.
- 매개변수화된 계산. 집계와 함께 작동하도록 설계된 다른 TFF 모듈에 연결 될 수있는 독립적 인 빌딩 블록
tff.aggregators
같은 그들의 필요한 응집 매개 변수화하기tff.learning.build_federated_averaging_process
. - 집계 구성. 집계 빌딩 블록은 다른 집계 빌딩 블록과 함께 구성되어 더 복잡한 복합 집계를 생성할 수 있습니다.