フェデレーテッドラーニングリサーチの TFF: モデルと更新圧縮

注意: この Colab は 最新リリースバージョンtensorflow_federated pip パッケージでの動作が確認されていますが、Tensorflow Federated プロジェクトは現在もプレリリース開発の段階にあるため、master では動作しない可能性があります。

このチュートリアルでは、EMNIST データセットを使用しながら、tff.learning.build_federated_averaging_process API と tensor_encoding API を使用するフェデレーテッドアベレージングアルゴリズムにおける通信コストを削減するために非可逆圧縮アルゴリズムを有効化する方法を実演します。フェデレーテッドアベレージングアルゴリズムの詳細については、論文「Communication-Efficient Learning of Deep Networks from Decentralized Data」をご覧ください。

始める前に

始める前に、次のコードを実行し、環境が正しくセットアップされていることを確認してください。挨拶文が表示されない場合は、インストールガイドで手順を確認してください。

!pip install --quiet --upgrade tensorflow_federated
!pip install --quiet --upgrade tensorflow-model-optimization

%load_ext tensorboard
import functools

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

from tensorflow_model_optimization.python.core.internal import tensor_encoding as te

TFF が動作していることを確認します。

@tff.federated_computation
def hello_world():
  return 'Hello, World!'

hello_world()
b'Hello, World!'

入力データを準備する

このセクションでは、TFF に含まれる EMNIST データセットを読み込んで事前処理します。EMNIST データセットの詳細は、画像分類のフェデレーテッドラーニングチュートリアルをご覧ください。

# This value only applies to EMNIST dataset, consider choosing appropriate
# values if switching to other datasets.
MAX_CLIENT_DATASET_SIZE = 418

CLIENT_EPOCHS_PER_ROUND = 1
CLIENT_BATCH_SIZE = 20
TEST_BATCH_SIZE = 500

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(
    only_digits=True)

def reshape_emnist_element(element):
  return (tf.expand_dims(element['pixels'], axis=-1), element['label'])

def preprocess_train_dataset(dataset):
  """Preprocessing function for the EMNIST training dataset."""
  return (dataset
          # Shuffle according to the largest client dataset
          .shuffle(buffer_size=MAX_CLIENT_DATASET_SIZE)
          # Repeat to do multiple local epochs
          .repeat(CLIENT_EPOCHS_PER_ROUND)
          # Batch to a fixed client batch size
          .batch(CLIENT_BATCH_SIZE, drop_remainder=False)
          # Preprocessing step
          .map(reshape_emnist_element))

emnist_train = emnist_train.preprocess(preprocess_train_dataset)

モデルを定義する

ここでは、元の FedAvg CNN に基づいて Keras モデルを定義し、それを tff.learning.Model インスタンスにラッピングして TFF が消費できるようにします。

モデルのみを直接生成する代わりに、モデルを生成する関数が必要となることに注意してください。また、その関数は構築済みのモデルをキャプチャするだけでなく、呼び出されるコンテキストで作成する必要があります。これは、TFF がデバイスで利用されるように設計されており、リソースが作られるタイミングを制御することで、キャプチャしてパッケージ化できる必要があるためです。

def create_original_fedavg_cnn_model(only_digits=True):
  """The CNN model used in https://arxiv.org/abs/1602.05629."""
  data_format = 'channels_last'

  max_pool = functools.partial(
      tf.keras.layers.MaxPooling2D,
      pool_size=(2, 2),
      padding='same',
      data_format=data_format)
  conv2d = functools.partial(
      tf.keras.layers.Conv2D,
      kernel_size=5,
      padding='same',
      data_format=data_format,
      activation=tf.nn.relu)

  model = tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
      conv2d(filters=32),
      max_pool(),
      conv2d(filters=64),
      max_pool(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(512, activation=tf.nn.relu),
      tf.keras.layers.Dense(10 if only_digits else 62),
      tf.keras.layers.Softmax(),
  ])

  return model

# Gets the type information of the input data. TFF is a strongly typed
# functional programming framework, and needs type information about inputs to 
# the model.
input_spec = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0]).element_spec

def tff_model_fn():
  keras_model = create_original_fedavg_cnn_model()
  return tff.learning.from_keras_model(
      keras_model=keras_model,
      input_spec=input_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

モデルのトレーニングとトレーニングメトリックの出力

フェデレーテッドアベレージングアルゴリズムを作成し、定義済みのモデルを EMNIST データセットでトレーニングする準備が整いました。

まず、tff.learning.build_federated_averaging_process API を使用して、フェデレーテッドアベレージングアルゴリズムを構築する必要があります。

federated_averaging = tff.learning.build_federated_averaging_process(
    model_fn=tff_model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

では、フェデレーテッドアベレージングアルゴリズムを実行しましょう。TFF の観点からフェデレーテッドアベレージングアルゴリズムを実行するには、次のようになります。

  1. アルゴリズムを初期化し、サーバーの初期状態を取得します。サーバーの状態には、アルゴリズムを実行するために必要な情報が含まれます。TFF は関数型であるため、この状態には、アルゴリズムが使用するオプティマイザの状態(慣性項)だけでなく、モデルパラメータ自体も含まれることを思い出してください。これらは引数として渡され、TFF 計算の結果として返されます。
  2. ラウンドごとにアルゴリズムを実行します。各ラウンドでは、新しいサーバーの状態が、データでモデルをトレーニングしている各クライアントの結果として返されます。通常、1 つのラウンドでは次のことが発生します。
    1. サーバーはすべての参加クライアントにモデルをブロードキャストします。
    2. 各クライアントは、モデルとそのデータに基づいて作業を実施します。
    3. サーバーはすべてのモデルを集約し、新しいモデルを含むサーバーの状態を生成します。

詳細については、カスタムフェデレーテッドアルゴリズム、パート 2: フェデレーテッドアベレージングの実装チュートリアルをご覧ください。

トレーニングメトリックは、トレーニング後に表示できるように、TensorBoard ディレクトリに書き込まれます。

Load utility functions

def train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
  """Trains the federated averaging process and output metrics."""
  # Create a environment to get communication cost.
  environment = set_sizing_environment()

  # Initialize the Federated Averaging algorithm to get the initial server state.
  state = federated_averaging_process.initialize()

  with summary_writer.as_default():
    for round_num in range(num_rounds):
      # Sample the clients parcitipated in this round.
      sampled_clients = np.random.choice(
          emnist_train.client_ids,
          size=num_clients_per_round,
          replace=False)
      # Create a list of `tf.Dataset` instances from the data of sampled clients.
      sampled_train_data = [
          emnist_train.create_tf_dataset_for_client(client)
          for client in sampled_clients
      ]
      # Round one round of the algorithm based on the server state and client data
      # and output the new state and metrics.
      state, metrics = federated_averaging_process.next(state, sampled_train_data)

      # For more about size_info, please see https://www.tensorflow.org/federated/api_docs/python/tff/framework/SizeInfo
      size_info = environment.get_size_info()
      broadcasted_bits = size_info.broadcast_bits[-1]
      aggregated_bits = size_info.aggregate_bits[-1]

      print('round {:2d}, metrics={}, broadcasted_bits={}, aggregated_bits={}'.format(round_num, metrics, format_size(broadcasted_bits), format_size(aggregated_bits)))

      # Add metrics to Tensorboard.
      for name, value in metrics['train']._asdict().items():
          tf.summary.scalar(name, value, step=round_num)

      # Add broadcasted and aggregated data size to Tensorboard.
      tf.summary.scalar('cumulative_broadcasted_bits', broadcasted_bits, step=round_num)
      tf.summary.scalar('cumulative_aggregated_bits', aggregated_bits, step=round_num)
      summary_writer.flush()
# Clean the log directory to avoid conflicts.
!rm -R /tmp/logs/scalars/*

# Set up the log directory and writer for Tensorboard.
logdir = "/tmp/logs/scalars/original/"
summary_writer = tf.summary.create_file_writer(logdir)

train(federated_averaging_process=federated_averaging, num_rounds=10,
      num_clients_per_round=10, summary_writer=summary_writer)
round  0, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.09433962404727936,loss=2.3181073665618896>>, broadcasted_bits=507.62MiB, aggregated_bits=507.62MiB
round  1, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.0765027329325676,loss=2.3148586750030518>>, broadcasted_bits=1015.24MiB, aggregated_bits=1015.24MiB
round  2, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.08872458338737488,loss=2.3089394569396973>>, broadcasted_bits=1.49GiB, aggregated_bits=1.49GiB
round  3, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.10852713137865067,loss=2.304060220718384>>, broadcasted_bits=1.98GiB, aggregated_bits=1.98GiB
round  4, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.10818713158369064,loss=2.3026843070983887>>, broadcasted_bits=2.48GiB, aggregated_bits=2.48GiB
round  5, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.10454985499382019,loss=2.300365447998047>>, broadcasted_bits=2.97GiB, aggregated_bits=2.97GiB
round  6, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.12841254472732544,loss=2.29765248298645>>, broadcasted_bits=3.47GiB, aggregated_bits=3.47GiB
round  7, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.14023210108280182,loss=2.2977216243743896>>, broadcasted_bits=3.97GiB, aggregated_bits=3.97GiB
round  8, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.15060241520404816,loss=2.29490327835083>>, broadcasted_bits=4.46GiB, aggregated_bits=4.46GiB
round  9, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.13088512420654297,loss=2.2942349910736084>>, broadcasted_bits=4.96GiB, aggregated_bits=4.96GiB

上記に示されるルートログディレクトリで TensorBoard を起動すると、トレーニングメトリックが表示されます。データの読み込みには数秒かかることがあります。Loss と Accuracy を除き、ブロードキャストされ集約されたデータの量も出力されます。ブロードキャストされたデータは、各クライアントにサーバーがプッシュしたテンソルで、集約データとは各クライアントがサーバーに返すテンソルを指します。

%tensorboard --logdir /tmp/logs/scalars/ --port=0

カスタムブロードキャストと集約関数を構築する

tensor_encoding API を使用して、ブロードキャストされたデータと集約データに対して非可逆圧縮アルゴリズムを使用する関数を実装しましょう。

まず、2 つの関数を定義します。

  • broadcast_encoder_fn: サーバーのテンソルまたは変数をクライアント通信にエンコードする te.core.SimpleEncoder のインスタンスを作成します(ブロードキャストデータ)。
  • mean_encoder_fn: クライアントのテンソルまたは変数をサーバー通信にエンコードする te.core.GatherEncoder インスタンスを作成します(集約データ)。

一度にモデル全体に圧縮メソッドを適用しないことに十分に注意してください。モデルの各変数を圧縮するかどうか、またはどのように圧縮するかは、個別に決定します。これは一般的に、バイアスなどの小さな変数は不正確性により敏感であり、比較的小さいことから、潜在的な通信の節約量も比較的小さくなるためです。そのため、デフォルトでは小さな変数を圧縮しません。この例では、10000 個を超える要素を持つ変数ごとに 8 ビット(256 バケット)の均一量子化を適用し、ほかの変数にのみ ID を適用します。

def broadcast_encoder_fn(value):
  """Function for building encoded broadcast."""
  spec = tf.TensorSpec(value.shape, value.dtype)
  if value.shape.num_elements() > 10000:
    return te.encoders.as_simple_encoder(
        te.encoders.uniform_quantization(bits=8), spec)
  else:
    return te.encoders.as_simple_encoder(te.encoders.identity(), spec)


def mean_encoder_fn(value):
  """Function for building encoded mean."""
  spec = tf.TensorSpec(value.shape, value.dtype)
  if value.shape.num_elements() > 10000:
    return te.encoders.as_gather_encoder(
        te.encoders.uniform_quantization(bits=8), spec)
  else:
    return te.encoders.as_gather_encoder(te.encoders.identity(), spec)

TFF は、エンコーダ関数を tff.learning.build_federated_averaging_process API が消費できる形式に変換する API を提供しています。tff.learning.framework.build_encoded_broadcast_from_modeltff.learning.framework.build_encoded_mean_from_model を使用することで、tff.learning.build_federated_averaging_processbroadcast_processaggregation_process 引数に渡して、非可逆圧縮アルゴリズムでフェデレーテッドアベレージングアルゴリズムを作成するための関数を 2 つ作成することができます。

encoded_broadcast_process = (
    tff.learning.framework.build_encoded_broadcast_process_from_model(
        tff_model_fn, broadcast_encoder_fn))
encoded_mean_process = (
    tff.learning.framework.build_encoded_mean_process_from_model(
    tff_model_fn, mean_encoder_fn))

federated_averaging_with_compression = tff.learning.build_federated_averaging_process(
    tff_model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),
    broadcast_process=encoded_broadcast_process,
    aggregation_process=encoded_mean_process)

もう一度モデルをトレーニングする

では、新しいフェデレーテッドアベレージングアルゴリズムを実行しましょう。

logdir_for_compression = "/tmp/logs/scalars/compression/"
summary_writer_for_compression = tf.summary.create_file_writer(
    logdir_for_compression)

train(federated_averaging_process=federated_averaging_with_compression, 
      num_rounds=10,
      num_clients_per_round=10,
      summary_writer=summary_writer_for_compression)
round  0, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.08722109347581863,loss=2.3216357231140137>>, broadcasted_bits=146.46MiB, aggregated_bits=146.46MiB
round  1, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.08379272371530533,loss=2.3108291625976562>>, broadcasted_bits=292.92MiB, aggregated_bits=292.92MiB
round  2, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.08834951370954514,loss=2.3074147701263428>>, broadcasted_bits=439.38MiB, aggregated_bits=439.39MiB
round  3, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.10467479377985,loss=2.305814027786255>>, broadcasted_bits=585.84MiB, aggregated_bits=585.85MiB
round  4, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.09853658825159073,loss=2.3012874126434326>>, broadcasted_bits=732.30MiB, aggregated_bits=732.31MiB
round  5, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.14904330670833588,loss=2.3005223274230957>>, broadcasted_bits=878.77MiB, aggregated_bits=878.77MiB
round  6, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.13152804970741272,loss=2.2985599040985107>>, broadcasted_bits=1.00GiB, aggregated_bits=1.00GiB
round  7, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.12392637878656387,loss=2.297102451324463>>, broadcasted_bits=1.14GiB, aggregated_bits=1.14GiB
round  8, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.13289350271224976,loss=2.2944107055664062>>, broadcasted_bits=1.29GiB, aggregated_bits=1.29GiB
round  9, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.12661737203598022,loss=2.2971296310424805>>, broadcasted_bits=1.43GiB, aggregated_bits=1.43GiB

もう一度 TensorBoard を起動して、2 つの実行のトレーニングメトリックを比較します。

Tensorboard を見てわかるように、broadcasted_bitsaggregated_bits 図 の orginialcompression の曲線に大きな減少を確認できます。losssparse_categorical_accuracy 図では、この 2 つの曲線は非常に似通っていました。

最後に、元のフェデレーテッドアベレージングアルゴリズムに似たパフォーマンスを達成できる圧縮アルゴリズムを実装しながら、通信コストを大幅に削減することができました。

%tensorboard --logdir /tmp/logs/scalars/ --port=0

演習

カスタム圧縮アルゴリズムを実装してトレーニングループに適用するには、次の手順に従います。

  1. 新しい圧縮アルゴリズムを EncodingStageInterface のサブクラスとして実装します。または、この例のようにより一般的なバリアントAdaptiveEncodingStageInterfaceとして実装します。
  2. 新しい Encoder を作成し、モデルブロードキャストまたはモデル更新の平均化に特化させます。
  3. これらのオブジェクトを使用して、トレーニング計算全体を構築します。

潜在的に価値の高いオープンリサーチの問いには、非均一量子化、ハフマンコーディングなどの可逆圧縮、および以前のトレーニングラウンドからの情報に基づいて圧縮を適応させるメカニズムが含まれます。

次は、推奨される読み物です。