페더레이션 학습 연구를 위한 TFF: 압축 모델링 및 업데이트

참고: 이 colab은 tensorflow_federated pip 패키지의 최신 릴리즈 버전에서 동작하는 것으로 확인되었지만, Tensorflow Federated 프로젝트는 아직 릴리즈 전 개발 중이며 master에서 동작하지 않을 수 있습니다.

이 튜토리얼에서는 EMNIST 데이터세트를 사용하여 tff.learning.build_federated_averaging_process API와 tensor_encoding API를 사용하는 Federated Averaging 알고리즘의 통신 비용을 줄이기 위해 손실 압축 알고리즘을 지원합니다. Federated Averaging 알고리즘에 대한 자세한 내용은 분산 데이터에서 심층 네트워크의 통신 효율적인 학습 논문을 참조하세요.

시작하기 전에

시작하기 전에 다음을 실행하여 환경이 올바르게 설정되었는지 확인합니다. 인사말이 표시되지 않으면 설치 가이드에서 지침을 참조하세요.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade tensorflow-model-optimization
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

%load_ext tensorboard
import functools

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

from tensorflow_model_optimization.python.core.internal import tensor_encoding as te

TFF가 동작하는지 확인합니다.

@tff.federated_computation
def hello_world():
  return 'Hello, World!'

hello_world()
b'Hello, World!'

입력 데이터 준비하기

이 섹션에서는 TFF에 포함된 EMNIST 데이터세트를 로드하고 전처리합니다. EMNIST 데이터세트에 대한 자세한 내용은 이미지 분류를 위한 Federated Learning 튜토리얼을 확인하세요.

# This value only applies to EMNIST dataset, consider choosing appropriate
# values if switching to other datasets.
MAX_CLIENT_DATASET_SIZE = 418

CLIENT_EPOCHS_PER_ROUND = 1
CLIENT_BATCH_SIZE = 20
TEST_BATCH_SIZE = 500

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(
    only_digits=True)

def reshape_emnist_element(element):
  return (tf.expand_dims(element['pixels'], axis=-1), element['label'])

def preprocess_train_dataset(dataset):
  """Preprocessing function for the EMNIST training dataset."""
  return (dataset
          # Shuffle according to the largest client dataset
          .shuffle(buffer_size=MAX_CLIENT_DATASET_SIZE)
          # Repeat to do multiple local epochs
          .repeat(CLIENT_EPOCHS_PER_ROUND)
          # Batch to a fixed client batch size
          .batch(CLIENT_BATCH_SIZE, drop_remainder=False)
          # Preprocessing step
          .map(reshape_emnist_element))

emnist_train = emnist_train.preprocess(preprocess_train_dataset)

모델 정의하기

여기에서는 원래 FedAvg CNN을 기반으로 keras 모델을 정의한 다음 TFF에서 사용할 수 있도록 tff.learning.Model 인스턴스에 keras 모델을 래핑합니다.

단순히 모델을 직접 생성하는 대신 모델을 생성하는 함수가 필요합니다. 또한, 이 함수는 미리 구성된 모델을 캡처할 수 없습니다. 함수가 호출된 컨텍스트에서 모델을 만들어야 합니다. 그 이유는 TFF는 기기로 이동하도록 설계되었으며 리소스를 캡처하고 패키징할 수 있도록 리소스의 구성 시기를 제어해야 하기 때문입니다.

def create_original_fedavg_cnn_model(only_digits=True):
  """The CNN model used in https://arxiv.org/abs/1602.05629."""
  data_format = 'channels_last'

  max_pool = functools.partial(
      tf.keras.layers.MaxPooling2D,
      pool_size=(2, 2),
      padding='same',
      data_format=data_format)
  conv2d = functools.partial(
      tf.keras.layers.Conv2D,
      kernel_size=5,
      padding='same',
      data_format=data_format,
      activation=tf.nn.relu)

  model = tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
      conv2d(filters=32),
      max_pool(),
      conv2d(filters=64),
      max_pool(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(512, activation=tf.nn.relu),
      tf.keras.layers.Dense(10 if only_digits else 62),
      tf.keras.layers.Softmax(),
  ])

  return model

# Gets the type information of the input data. TFF is a strongly typed
# functional programming framework, and needs type information about inputs to 
# the model.
input_spec = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0]).element_spec

def tff_model_fn():
  keras_model = create_original_fedavg_cnn_model()
  return tff.learning.from_keras_model(
      keras_model=keras_model,
      input_spec=input_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

모델 훈련 및 훈련 메트릭 출력

이제 Federated Averaging 알고리즘을 구성하고 EMNIST 데이터세트에 대해 정의된 모델을 훈련할 준비가 되었습니다.

먼저 tff.learning.build_federated_averaging_process API를 사용하여 Federated Averaging 알고리즘을 빌드해야 합니다.

federated_averaging = tff.learning.build_federated_averaging_process(
    model_fn=tff_model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

이제 Federated Averaging 알고리즘을 실행해 보겠습니다. TFF의 관점에서 Federated Learning 알고리즘의 실행은 다음과 같습니다.

  1. 알고리즘을 초기화하고 초기 서버 상태를 가져옵니다. 서버 상태에는 알고리즘을 수행하는 데 필요한 정보가 포함되어 있습니다. TFF는 함수형이므로 이 상태에는 알고리즘이 사용하는 모든 최적화 상태(예: momentum 조건)와 모델 매개변수 자체가 모두 포함됩니다. 이 상태는 인수로 전달되고 TFF 계산의 결과로 반환됩니다.
  2. 알고리즘을 라운드별로 실행합니다. 각 라운드에서 각 클라이언트가 데이터에 대해 모델을 훈련한 결과로서 새로운 서버 상태가 반환됩니다. 일반적으로 한 라운드에서 다음과 같습니다.
    1. 서버는 모든 참여 클라이언트로 모델을 브로드캐스팅합니다.
    2. 각 클라이언트는 모델과 자체 데이터를 기반으로 작업을 수행합니다.
    3. 서버는 모든 모델을 집계하여 새 모델을 포함하는 서버 상태를 생성합니다.

자세한 내용은 사용자 정의 Federated Algorithm, 2부: Federated Averaging 구현하기 튜토리얼을 참조하세요.

훈련 메트릭은 훈련 후 표시하기 위해 Tensorboard 디렉토리에 기록됩니다.

Load utility functions

def train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
  """Trains the federated averaging process and output metrics."""
  # Create a environment to get communication cost.
  environment = set_sizing_environment()

  # Initialize the Federated Averaging algorithm to get the initial server state.
  state = federated_averaging_process.initialize()

  with summary_writer.as_default():
    for round_num in range(num_rounds):
      # Sample the clients parcitipated in this round.
      sampled_clients = np.random.choice(
          emnist_train.client_ids,
          size=num_clients_per_round,
          replace=False)
      # Create a list of `tf.Dataset` instances from the data of sampled clients.
      sampled_train_data = [
          emnist_train.create_tf_dataset_for_client(client)
          for client in sampled_clients
      ]
      # Round one round of the algorithm based on the server state and client data
      # and output the new state and metrics.
      state, metrics = federated_averaging_process.next(state, sampled_train_data)

      # For more about size_info, please see https://www.tensorflow.org/federated/api_docs/python/tff/framework/SizeInfo
      size_info = environment.get_size_info()
      broadcasted_bits = size_info.broadcast_bits[-1]
      aggregated_bits = size_info.aggregate_bits[-1]

      print('round {:2d}, metrics={}, broadcasted_bits={}, aggregated_bits={}'.format(round_num, metrics, format_size(broadcasted_bits), format_size(aggregated_bits)))

      # Add metrics to Tensorboard.
      for name, value in metrics['train']._asdict().items():
          tf.summary.scalar(name, value, step=round_num)

      # Add broadcasted and aggregated data size to Tensorboard.
      tf.summary.scalar('cumulative_broadcasted_bits', broadcasted_bits, step=round_num)
      tf.summary.scalar('cumulative_aggregated_bits', aggregated_bits, step=round_num)
      summary_writer.flush()
# Clean the log directory to avoid conflicts.
!rm -R /tmp/logs/scalars/*

# Set up the log directory and writer for Tensorboard.
logdir = "/tmp/logs/scalars/original/"
summary_writer = tf.summary.create_file_writer(logdir)

train(federated_averaging_process=federated_averaging, num_rounds=10,
      num_clients_per_round=10, summary_writer=summary_writer)
round  0, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.09433962404727936,loss=2.3181073665618896>>, broadcasted_bits=507.62MiB, aggregated_bits=507.62MiB
round  1, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.0765027329325676,loss=2.3148586750030518>>, broadcasted_bits=1015.24MiB, aggregated_bits=1015.24MiB
round  2, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.08872458338737488,loss=2.3089394569396973>>, broadcasted_bits=1.49GiB, aggregated_bits=1.49GiB
round  3, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.10852713137865067,loss=2.304060220718384>>, broadcasted_bits=1.98GiB, aggregated_bits=1.98GiB
round  4, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.10818713158369064,loss=2.3026843070983887>>, broadcasted_bits=2.48GiB, aggregated_bits=2.48GiB
round  5, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.10454985499382019,loss=2.300365447998047>>, broadcasted_bits=2.97GiB, aggregated_bits=2.97GiB
round  6, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.12841254472732544,loss=2.29765248298645>>, broadcasted_bits=3.47GiB, aggregated_bits=3.47GiB
round  7, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.14023210108280182,loss=2.2977216243743896>>, broadcasted_bits=3.97GiB, aggregated_bits=3.97GiB
round  8, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.15060241520404816,loss=2.29490327835083>>, broadcasted_bits=4.46GiB, aggregated_bits=4.46GiB
round  9, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.13088512420654297,loss=2.2942349910736084>>, broadcasted_bits=4.96GiB, aggregated_bits=4.96GiB

위에서 지정한 루트 로그 디렉터리로 TensorBoard를 시작하여 훈련 메트릭을 표시합니다. 데이터를 로드하는 데 몇 초 정도 걸릴 수 있습니다. 손실 및 정확성을 제외하고는 브로트캐스팅 및 집계된 데이터의 양도 출력합니다. 브로드캐스팅된 데이터는 서버가 각 클라이언트에 푸시하는 텐서를 의미하며, 집계 데이터는 각 클라이언트가 서버에 반환하는 텐서를 의미합니다.

%tensorboard --logdir /tmp/logs/scalars/ --port=0

사용자 정의 브로드캐스트 및 집계 함수 빌드하기

이제 tensor_encoding API를 사용하여 브로드캐스팅된 데이터와 집계된 데이터에 손실 압축 알고리즘을 사용하는 함수를 구현해 보겠습니다.

먼저 두 가지 함수를 정의합니다.

  • broadcast_encoder_fnte.core.SimpleEncoder의 인스턴스를 생성하여 서버에서 클라이언트로의 통신(브로드캐스트 데이터)에서 텐서 또는 변수를 인코딩합니다.
  • mean_encoder_fnte.core.GatherEncoder의 인스턴스를 생성하여 클라이언트에서 서버로의 통신(집계 데이터)에서 텐서 또는 변수를 인코딩합니다.

한 번에 전체 모델에 압축 메서드를 적용하지 않는다는 점에 유의해야 합니다. 대신 모델의 각 변수를 독립적으로 압축하는 방법(및 압축 여부)을 결정합니다. 그 이유는 일반적으로 바이어스와 같은 작은 변수는 부정확성에 더 민감하고 상대적으로 작기 때문에 잠재적인 통신 비용 절감도 상대적으로 적기 때문입니다. 따라서 기본적으로 작은 변수는 압축하지 않습니다. 이 예제에서는 요소가 10000개 이상인 모든 변수에 대해 8bit(256개 버킷)에 균일한 양자화를 적용하고 다른 변수에만 ID를 적용합니다.

def broadcast_encoder_fn(value):
  """Function for building encoded broadcast."""
  spec = tf.TensorSpec(value.shape, value.dtype)
  if value.shape.num_elements() > 10000:
    return te.encoders.as_simple_encoder(
        te.encoders.uniform_quantization(bits=8), spec)
  else:
    return te.encoders.as_simple_encoder(te.encoders.identity(), spec)


def mean_encoder_fn(value):
  """Function for building encoded mean."""
  spec = tf.TensorSpec(value.shape, value.dtype)
  if value.shape.num_elements() > 10000:
    return te.encoders.as_gather_encoder(
        te.encoders.uniform_quantization(bits=8), spec)
  else:
    return te.encoders.as_gather_encoder(te.encoders.identity(), spec)

TFF는 encoder 함수를 tff.learning.build_federated_averaging_process API가 사용할 수 있는 형식으로 변환하는 API를 제공합니다. tff.learning.framework.build_encoded_broadcast_from_modeltff.learning.framework.build_encoded_mean_from_model을 사용하여 tff.learning.build_federated_averaging_processbroadcast_processaggregation_process 인수로 전달될 수 있는 두 개의 함수를 생성하여 손실 압축 알고리즘으로 Federated Averaging 알고리즘을 만듭니다.

encoded_broadcast_process = (
    tff.learning.framework.build_encoded_broadcast_process_from_model(
        tff_model_fn, broadcast_encoder_fn))
encoded_mean_process = (
    tff.learning.framework.build_encoded_mean_process_from_model(
    tff_model_fn, mean_encoder_fn))

federated_averaging_with_compression = tff.learning.build_federated_averaging_process(
    tff_model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),
    broadcast_process=encoded_broadcast_process,
    aggregation_process=encoded_mean_process)

모델 다시 훈련하기

이제 새로운 Federated Averaging 알고리즘을 실행해 보겠습니다.

logdir_for_compression = "/tmp/logs/scalars/compression/"
summary_writer_for_compression = tf.summary.create_file_writer(
    logdir_for_compression)

train(federated_averaging_process=federated_averaging_with_compression, 
      num_rounds=10,
      num_clients_per_round=10,
      summary_writer=summary_writer_for_compression)
round  0, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.08722109347581863,loss=2.3216357231140137>>, broadcasted_bits=146.46MiB, aggregated_bits=146.46MiB
round  1, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.08379272371530533,loss=2.3108291625976562>>, broadcasted_bits=292.92MiB, aggregated_bits=292.92MiB
round  2, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.08834951370954514,loss=2.3074147701263428>>, broadcasted_bits=439.38MiB, aggregated_bits=439.39MiB
round  3, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.10467479377985,loss=2.305814027786255>>, broadcasted_bits=585.84MiB, aggregated_bits=585.85MiB
round  4, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.09853658825159073,loss=2.3012874126434326>>, broadcasted_bits=732.30MiB, aggregated_bits=732.31MiB
round  5, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.14904330670833588,loss=2.3005223274230957>>, broadcasted_bits=878.77MiB, aggregated_bits=878.77MiB
round  6, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.13152804970741272,loss=2.2985599040985107>>, broadcasted_bits=1.00GiB, aggregated_bits=1.00GiB
round  7, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.12392637878656387,loss=2.297102451324463>>, broadcasted_bits=1.14GiB, aggregated_bits=1.14GiB
round  8, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.13289350271224976,loss=2.2944107055664062>>, broadcasted_bits=1.29GiB, aggregated_bits=1.29GiB
round  9, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.12661737203598022,loss=2.2971296310424805>>, broadcasted_bits=1.43GiB, aggregated_bits=1.43GiB

TensorBoard를 다시 시작하여 두 실행 간의 훈련 메트릭을 비교합니다.

Tensorboard에서 볼 수 있듯이, broadcasted_bitsaggregated_bits 플롯에서 orginialcompression 곡선 간에 상당한 감소가 있는 반면 losssparse_categorical_accuracy 플롯에서는 두 곡선이 매우 유사합니다.

결론적으로, 원래의 Federated Averaging 알고리즘과 유사한 성능을 달성 할 수 있는 압축 알고리즘을 구현하면서 통신 비용은 대폭 절감했습니다.

%tensorboard --logdir /tmp/logs/scalars/ --port=0

연습

사용자 정의 압축 알고리즘을 구현하고 훈련 루프에 적용하기 위해 다음을 수행할 수 있습니다.

  1. 이 예제에 따라 EncodingStageInterface의 서브 클래스 또는 보다 일반적인 변형인 AdaptiveEncodingStageInterface로 새 압축 알고리즘을 구현합니다.
  2. Encoder를 구성하고 모델 브로드캐스팅 또는 모델 업데이트 평균화를 위해 특수화합니다.
  3. 이들 객체를 사용하여 전체 훈련 계산을 빌드합니다.

잠재적으로 가치 있는 개방형 연구 질문에는 불균일 양자화, 허프만 코딩과 같은 무손실 압축, 이전 훈련 라운드의 정보를 기반으로 한 압축을 조정하는 메커니즘이 포함됩니다.

권장 참고 자료: