TensorFlow.orgで表示 | GoogleColabで実行 | GitHubでソースを表示 | ノートブックをダウンロード |
このチュートリアルでは、後ろの設計原理を説明tff.aggregators
モジュールとサーバにクライアントから値のカスタム集計を実装するためのベストプラクティス。
前提条件。このチュートリアルでは、すでにの基本的な概念に精通している前提としていフェデレーションコアな配置(としてtff.SERVER
、 tff.CLIENTS
TFFは計算を表し方法)、( tff.tf_computation
、 tff.federated_computation
)とそれらの型シグネチャ。
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
設計概要
TFFにおいて、「凝集」とは、上の値のセットの移動を指すtff.CLIENTS
上の同じタイプの集計値を生成するtff.SERVER
。つまり、個々のクライアントの値が利用可能である必要はありません。たとえば、連合学習では、クライアントモデルの更新が平均化されて、サーバー上のグローバルモデルに適用される集約モデルの更新が取得されます。
例えば、この目的を達成するオペレータに加えてtff.federated_sum
、TFFを提供tff.templates.AggregationProcess
(ステートフル・プロセスが単純な合計よりも複雑な形態に一般化することができるように、集約計算のための型シグネチャを形式化)。
主要な構成要素tff.aggregators
を作成するための工場であるモジュールAggregationProcess
二つの側面におけるTFFの一般的に有用と交換可能なビルディング・ブロックであるように設計されています:
- パラメータ化された計算。凝集はで動作するように設計された他のTFFモジュールにプラグインすることができる独立したビルディングブロックである
tff.aggregators
その必要集合をパラメータ化します。
例:
learning_process = tff.learning.build_federated_averaging_process(
...,
model_update_aggregation_factory=tff.aggregators.MeanFactory())
- 集約構成。アグリゲーションビルディングブロックを他のアグリゲーションビルディングブロックと組み合わせて、より複雑な複合アグリゲーションを作成できます。
例:
secure_mean = tff.aggregators.MeanFactory(
value_sum_factory=tff.aggregators.SecureSumFactory(...))
このチュートリアルの残りの部分では、これら2つの目標がどのように達成されるかについて説明します。
集約プロセス
我々は最初の要約tff.templates.AggregationProcess
し、その作成するためのファクトリパターンで従ってください。
tff.templates.AggregationProcess
あるtff.templates.MeasuredProcess
集合に対して指定型シグネチャを有します。具体的には、 initialize
とnext
の機能は、以下の型シグネチャを有します。
-
( -> state_type@SERVER)
-
(<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)
(型の状態state_type
)サーバーに置かれなければなりません。 next
関数は、状態値が(型の集約される入力引数として取るvalue_type
クライアントに設置します)。 *
加重平均でインスタンスの重みのための手段の任意の他の入力引数。更新された状態オブジェクト、サーバーに配置された同じタイプの集計値、およびいくつかの測定値を返します。
状態の両方が実行の間に渡されることに注意してくださいnext
関数、及び報告された測定値は、特定の実行に応じて任意の情報を報告することを意図するものでnext
の機能を、空であってもよいです。それにもかかわらず、TFFの他の部分が従うべき明確な契約を持つためには、それらを明示的に指定する必要があります。
他のTFFモジュールは、インスタンス内のモデルの更新をtff.learning
、使用することが期待されるtff.templates.AggregationProcess
値を集計する方法をパラメータ化します。ただし、集計された値とその型シグネチャが正確に何であるかは、トレーニングされているモデルの他の詳細と、それを実行するために使用された学習アルゴリズムによって異なります。
計算の他の態様の凝集独立を作るために、我々は、工場のパターンを使用する-私たちは、適切な作成tff.templates.AggregationProcess
オブジェクトの関連する型シグネチャを集約するために一度呼び出すことによって、利用可能でcreate
ファクトリの方法。したがって、集約プロセスの直接処理は、この作成を担当するライブラリ作成者にのみ必要です。
集約プロセスファクトリ
重み付けされていない集計と重み付けされた集計には、2つの抽象基本ファクトリクラスがあります。そのcreate
方法集計される値の型シグネチャを取り返しtff.templates.AggregationProcess
ような値の集約のために。
作成されたプロセスtff.aggregators.UnweightedAggregationFactory
サーバに、(1)状態、指定型の(2)値の2つの入力引数を取りvalue_type
。
実装例があるtff.aggregators.SumFactory
。
作成されたプロセスtff.aggregators.WeightedAggregationFactory
指定されたタイプのサーバで(1)状態、(2)の値:3つの入力引数を取りvalue_type
と型の(3)重量weight_type
その呼び出すとき工場のユーザによって指定されるように、 create
方法。
実装例はあるtff.aggregators.MeanFactory
加重平均を計算します。
ファクトリパターンは、上記の最初の目標を達成する方法です。その集約は独立した構成要素です。たとえば、トレーニング可能なモデル変数を変更する場合、複雑な集計を必ずしも変更する必要はありません。以下のような方法で使用されるときにそれを表現する工場は、異なるタイプの署名で呼び出されるtff.learning.build_federated_averaging_process
。
作曲
一般的な集計プロセスでは、(a)クライアントでの値の前処理、(b)クライアントからサーバーへの値の移動、および(c)サーバーでの集計値の後処理をカプセル化できることを思い出してください。凝集組成物は、上述した第二の目的は、内部に実現さtff.aggregators
構造によってモジュール部分(b)は、別の集約工場に委任することができるように集約工場の実装。
単一のファクトリクラス内に必要なすべてのロジックを実装するのではなく、実装はデフォルトで、集約に関連する単一の側面に焦点を合わせています。必要に応じて、このパターンにより、ビルディングブロックを一度に1つずつ交換できます。
一例では、重み付けされtff.aggregators.MeanFactory
。その実装は、クライアントで提供された値と重みを乗算し、重み付きの値と重みの両方を個別に合計してから、重み付きの値の合計をサーバーでの重みの合計で除算します。代わりに直接使用して和を実現するtff.federated_sum
オペレータを、総和は、2つのインスタンスに委任されtff.aggregators.SumFactory
。
このような構造により、2つのデフォルトの合計を、合計を異なる方法で実現する異なるファクトリに置き換えることができます。例えば、 tff.aggregators.SecureSumFactory
、またはのカスタム実装tff.aggregators.UnweightedAggregationFactory
。逆に、時間は、 tff.aggregators.MeanFactory
、それ自体のような他の工場の内部凝集することができるtff.aggregators.clipping_factory
値は平均化の前にクリッピングされる場合、。
前回参照チューニング学習のための集計を推奨して既存の工場を使用して合成機構のreceommended用途のためのチュートリアルをtff.aggregators
モジュール。
例によるベストプラクティス
我々は説明しようとしているtff.aggregators
簡単な例のタスクを実装することにより詳細にコンセプトを、それが次第に一般的にします。学ぶ別の方法は、既存の工場の実装を調べることです。
import collections
import tensorflow as tf
import tensorflow_federated as tff
代わりに加算するvalue
、例えば、タスクが合計であるvalue * 2.0
し、次にによって和を分割2.0
。集計結果は、このように直接加算と数学的に等価であるvalue
(1)クライアントにスケーリング(2)サーバでunscaling(3)クライアントを横切って合計:、3つの部分から成ると考えることができます。
設計は、先に説明した後、ロジックは、のサブクラスとして実装されるtff.aggregators.UnweightedAggregationFactory
適切な作成、 tff.templates.AggregationProcess
所与value_type
集計ました:
最小限の実装
サンプルタスクの場合、必要な計算は常に同じであるため、stateを使用する必要はありません。それはこのように空にし、として表されるtff.federated_value((), tff.SERVER)
今のところ、同じことが測定にも当てはまります。
したがって、タスクの最小限の実装は次のとおりです。
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value((), tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
scaled_value = tff.federated_map(
tff.tf_computation(lambda x: x * 2.0), value)
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x: x / 2.0), summed_value)
measurements = tff.federated_value((), tff.SERVER)
return tff.templates.MeasuredProcessOutput(
state=state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
すべてが期待どおりに機能するかどうかは、次のコードで確認できます。
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result} (expected 8.0)')
Type signatures of the created aggregation process: - initialize: ( -> <>@SERVER) - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>) Aggregation result: 8.0 (expected 8.0)
ステートフルネスと測定
ステートフルネスはTFFで広く使用されており、反復的に実行され、反復ごとに変化すると予想される計算を表します。たとえば、学習計算の状態には、学習中のモデルの重みが含まれます。
集計計算で状態を使用する方法を説明するために、サンプルタスクを変更します。代わりに、乗算のvalue
することによって2.0
、我々は乗算、それ反復インデックスによって-凝集が実行された回数。
そのためには、状態の概念を通じて達成される反復インデックスを追跡する方法が必要です。 initialize_fn
、代わりに空の状態を作成する、我々は、スカラーゼロに状態を初期化します。その後、状態で使用することができるnext_fn
によって(1)インクリメント:三の段階で1.0
、(2)多重に使用するvalue
、および新しい更新された状態として(3)リターン。
これが完了すると、あなたは注意することがあります。しかし、上記と全く同じコードが期待通りにすべての作品を検証するために使用することができます。何かが実際に変わったことをどうやって知ることができますか?
良い質問!ここで、測定の概念が役立ちます。一般に、測定が単一の実行に関連した任意の値を報告することができnext
監視のために使用することができる機能を、。この場合には、とすることができるsummed_value
前の例から。つまり、「スケール解除」ステップの前の値であり、反復インデックスに依存する必要があります。繰り返しますが、これは実際には必ずしも有用ではありませんが、関連するメカニズムを示しています。
したがって、タスクに対するステートフルな回答は次のようになります。
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(
tff.tf_computation(lambda x: x + 1.0), state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
ことを注意state
になるnext_fn
入力などがサーバーに置かれています。クライアントにそれを使用するためには、最初に使用して達成され、通信する必要があるtff.federated_broadcast
演算子を。
予想通り、すべての作品を検証するために、我々は今、報告を見ることができmeasurements
、実行の各ラウンドで異なるはず、たとえ同じで走行client_data
。
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 1)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 2)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 3)')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>) | Round #1 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 8.0 (expected 8.0 * 1) | Round #2 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 16.0 (expected 8.0 * 2) | Round #3 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 24.0 (expected 8.0 * 3)
構造化タイプ
連合学習で訓練されたモデルのモデルの重みは、通常、単一のテンソルではなく、テンソルのコレクションとして表されます。 TFFにおいて、これは、として表されtff.StructType
と一般的に有用な凝集工場は構造タイプを受け入れることができるようにする必要があります。
しかし、上記の例では、我々は唯一と協力tff.TensorType
オブジェクト。我々が凝集プロセスを作成するために、以前の工場を使用しようとするとtff.StructType([(tf.float32, (2,)), (tf.float32, (3,))])
私たちは奇妙なので、エラーが発生しますTensorFlowは乗算しようとしますtf.Tensor
とlist
。
問題ではなく、一定でテンソルの構造を掛けると、我々は定数によって構造の各テンソルを乗算する必要があるということです。この問題に対する通常の解決方法は使用することですtf.nest
作成のモジュール内部tff.tf_computation
秒。
以前のバージョンExampleTaskFactory
次のように構造化タイプとの互換性は、このようになります。
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(add_one, state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(scale, (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
この例では、TFFコードを構造化するときに従うのに役立つ可能性のあるパターンを強調しています。非常に簡単な操作で扱っていない場合は、コードが読みやすくなったときに、よりtff.tf_computation
内部のビルディングブロックとしての使用されることをtff.federated_computation
別の場所に作成されます。内部のtff.federated_computation
、これらのビルディングブロックは、固有の演算子を使用して接続されています。
期待どおりに機能することを確認するには:
client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
[[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
f' Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>) Aggregation result: [[2. 3.], [6. 4. 0.]] Expected: [[2. 3.], [6. 4. 0.]]
内部集計
最後のステップは、さまざまな集約手法を簡単に構成できるようにするために、オプションで実際の集約を他のファクトリに委任できるようにすることです。
これは、オプション作成することによって達成されるinner_factory
当社のコンストラクタで引数ExampleTaskFactory
。指定しない場合、 tff.aggregators.SumFactory
適用される、使用されるtff.federated_sum
前のセクションで直接使用演算子。
ときにcreate
と呼ばれ、我々は最初に呼び出すことができますcreate
のinner_factory
同じで、内部凝集プロセスを作成するvalue_type
。
返された私たちのプロセスの状態initialize_fn
「この」プロセスによって作成された状態、および作成した内部プロセスの状態:二つの部分の組成物です。
実装next_fn
実際の凝集がに委任されている点で異なるnext
インナープロセスの機能、及び最終的な出力を構成する方法です。状態が再び「この」及び「内側」状態で構成され、測定値はと同様に構成されているOrderedDict
。
以下は、そのようなパターンの実装です。
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def __init__(self, inner_factory=None):
if inner_factory is None:
inner_factory = tff.aggregators.SumFactory()
self._inner_factory = inner_factory
def create(self, value_type):
inner_process = self._inner_factory.create(value_type)
@tff.federated_computation()
def initialize_fn():
my_state = tff.federated_value(0.0, tff.SERVER)
inner_state = inner_process.initialize()
return tff.federated_zip((my_state, inner_state))
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
my_state, inner_state = state
my_new_state = tff.federated_map(add_one, my_state)
my_state_at_clients = tff.federated_broadcast(my_new_state)
scaled_value = tff.federated_map(scale, (value, my_state_at_clients))
# Delegation to an inner factory, returning values placed at SERVER.
inner_output = inner_process.next(inner_state, scaled_value)
unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))
new_state = tff.federated_zip((my_new_state, inner_output.state))
measurements = tff.federated_zip(
collections.OrderedDict(
scaled_value=inner_output.result,
example_task=inner_output.measurements))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
委任する場合inner_process.next
機能、我々が得るリターン構造があるtff.templates.MeasuredProcessOutput
-同じ3つのフィールドで、 state
、 result
およびmeasurements
。構成凝集プロセスの全体的なリターン構造を作成する場合、 state
及びmeasurements
フィールドは、一般的に構成され、一緒に返されるべきです。対照的に、 result
値に対応するフィールドを集約して、代わりに成る集合「流れます」。
state
オブジェクトは、ファクトリの実装の詳細として見られるべきであり、従ってこの組成物は、任意の構造のものであってもよいです。しかし、 measurements
ある時点でユーザに報告される値に対応します。したがって、我々は、使用することをお勧めしますOrderedDict
構成は、どこ組成のメトリックはから来て報告しない明確になるよう命名し、。
また、使用に注意してくださいtff.federated_zip
演算子を。 state
作成プロセスによってcontolledオブジェクトがあるべきtff.FederatedType
。私たちが代わりに戻っていた場合(this_state, inner_state)
にinitialize_fn
、その戻り値の型シグネチャは次のようになりtff.StructType
の2要素のタプル含むtff.FederatedType
秒。使用tff.federated_zip
「リフト」 tff.FederatedType
トップレベルに。これは、同様に使用されnext_fn
返される状態の測定を調製する場合。
最後に、これをデフォルトの内部集計でどのように使用できるかを確認できます。
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: () | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: ()
...そして異なる内部集合体で。たとえば、 ExampleTaskFactory
:
client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())]) | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])
概要
このチュートリアルでは、アグリゲーションファクトリとして表される汎用アグリゲーションビルディングブロックを作成するために従うべきベストプラクティスについて説明しました。一般性は、次の2つの方法で設計意図によってもたらされます。
- パラメータ化された計算。凝集はで動作するように設計された他のTFFモジュールにプラグインすることができる独立したビルディングブロックである
tff.aggregators
ようなそれらの必要な集合をパラメータ化するために、tff.learning.build_federated_averaging_process
。 - 集約構成。アグリゲーションビルディングブロックを他のアグリゲーションビルディングブロックと組み合わせて、より複雑な複合アグリゲーションを作成できます。