This tutorial will describe how to set up high-performance simulation using a TFF runtime running on Kubernetes. The model is the same as in the previous tutorial, High-performance simulations with TFF. The only difference is that here we use a worker pool instead of a local executor.
This tutorial refers to Google Cloud's GKE to create the Kubernetes cluster, but all the steps after the cluster is created can be used with any Kubernetes installation.
![]() |
![]() |
![]() |
GKE で TFF ワーカーを起動する
注: このチュートリアルは、ユーザーが既存の GCP プロジェクトを持っていることを前提としています。
Kubernetes クラスタの作成
次の手順は一度だけ実行する必要があります。クラスタは、今後のワークロードに再利用できます。
GKE の指示に従って、コンテナクラスタを作成します。このチュートリアルの以下の部分では、クラスタ名がtff-cluster
であると想定していますが、実際の名前は重要ではありません。「ステップ 5: アプリケーションのデプロイ」に到達したら、指示に従うのをやめます。
TFF ワーカーアプリケーションをデプロイする
GCP とやり取りするコマンドは、ローカルまたは Google Cloud Shell で実行できます。Google Cloud Shell では、追加の設定は必要ないため、Google Cloud Shell の使用をお勧めします。
- 次のコマンドを実行して、Kubernetes アプリケーションを起動します。
$ kubectl create deployment tff-workers --image=gcr.io/tensorflow-federated/remote-executor-service:{ {version} }
- アプリケーションのロードバランサを追加します。
$ kubectl expose deployment tff-workers --type=LoadBalancer --port 80 --target-port 8000
注: これにより、デプロイメントがインターネットに公開されますが、これはデモのみのためです。運用環境では、ファイアウォールと認証を強くお勧めします。
Google Cloud Console でロードバランサの IP アドレスを検索します。後でトレーニングループをワーカーアプリに接続するために必要になります。
(または) Docker コンテナをローカルで起動する
$ docker run --rm -p 8000:8000 gcr.io/tensorflow-federated/remote_executor_service:{ {version} }
TFF 環境の設定
pip install --upgrade tensorflow_federated
トレーニングするモデルの定義
import collections
import time
import tensorflow as tf
import tensorflow_federated as tff
source, _ = tff.simulation.datasets.emnist.load_data()
def map_fn(example):
return collections.OrderedDict(
x=tf.reshape(example['pixels'], [-1, 784]), y=example['label'])
def client_data(n):
ds = source.create_tf_dataset_for_client(source.client_ids[n])
return ds.repeat(10).batch(20).map(map_fn)
train_data = [client_data(n) for n in range(10)]
input_spec = train_data[0].element_spec
def model_fn():
model = tf.keras.models.Sequential([
tf.keras.layers.Input(shape=(784,)),
tf.keras.layers.Dense(units=10, kernel_initializer='zeros'),
tf.keras.layers.Softmax(),
])
return tff.learning.from_keras_model(
model,
input_spec=input_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
trainer = tff.learning.build_federated_averaging_process(
model_fn, client_optimizer_fn=lambda: tf.keras.optimizers.SGD(0.02))
def evaluate(num_rounds=10):
state = trainer.initialize()
for round in range(num_rounds):
t1 = time.time()
state, metrics = trainer.next(state, train_data)
t2 = time.time()
print('Round {}: loss {}, round time {}'.format(round, metrics.loss, t2 - t1))
リモートエグゼキュータのセットアップ
デフォルトでは、TFF はすべての計算をローカルで実行します。このステップでは、上で設定した Kubernetes サービスに接続するよう TFF に指示します。サービスの IP アドレスは必ずここにコピーします。
import grpc
ip_address = '0.0.0.0'
port = 80
client_ex = []
for i in range(10):
channel = grpc.insecure_channel('{}:{}'.format(ip_address, port))
client_ex.append(tff.framework.RemoteExecutor(channel, rpc_mode='STREAMING'))
factory = tff.framework.worker_pool_executor_factory(client_ex)
context = tff.framework.ExecutionContext(factory)
tff.framework.set_default_context(context)
トレーニングの実行
evaluate()
Round 0: loss 4.370407581329346, round time 4.201097726821899 Round 1: loss 4.1407670974731445, round time 3.3283166885375977 Round 2: loss 3.865147590637207, round time 3.098310947418213 Round 3: loss 3.534019708633423, round time 3.1565616130828857 Round 4: loss 3.272688388824463, round time 3.175067663192749 Round 5: loss 2.935391664505005, round time 3.008434534072876 Round 6: loss 2.7399251461029053, round time 3.31435227394104 Round 7: loss 2.5054931640625, round time 3.4411356449127197 Round 8: loss 2.290508985519409, round time 3.158798933029175 Round 9: loss 2.1194536685943604, round time 3.1348156929016113