참고: 이 colab은 tensorflow_federated
pip 패키지의 최신 릴리즈 버전에서 동작하는 것으로 확인되었지만, Tensorflow Federated 프로젝트는 아직 릴리즈 전 개발 중이며 master
에서 동작하지 않을 수 있습니다.
이 튜토리얼에서는 EMNIST 데이터세트를 사용하여 tff.learning.build_federated_averaging_process
API와 tensor_encoding API를 사용하는 Federated Averaging 알고리즘의 통신 비용을 줄이기 위해 손실 압축 알고리즘을 지원합니다. Federated Averaging 알고리즘에 대한 자세한 내용은 분산 데이터에서 심층 네트워크의 통신 효율적인 학습 논문을 참조하세요.
시작하기 전에
시작하기 전에 다음을 실행하여 환경이 올바르게 설정되었는지 확인합니다. 인사말이 표시되지 않으면 설치 가이드에서 지침을 참조하세요.
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade tensorflow-model-optimization
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
%load_ext tensorboard
import functools
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
from tensorflow_model_optimization.python.core.internal import tensor_encoding as te
TFF가 동작하는지 확인합니다.
@tff.federated_computation
def hello_world():
return 'Hello, World!'
hello_world()
b'Hello, World!'
입력 데이터 준비하기
이 섹션에서는 TFF에 포함된 EMNIST 데이터세트를 로드하고 전처리합니다. EMNIST 데이터세트에 대한 자세한 내용은 이미지 분류를 위한 Federated Learning 튜토리얼을 확인하세요.
# This value only applies to EMNIST dataset, consider choosing appropriate
# values if switching to other datasets.
MAX_CLIENT_DATASET_SIZE = 418
CLIENT_EPOCHS_PER_ROUND = 1
CLIENT_BATCH_SIZE = 20
TEST_BATCH_SIZE = 500
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(
only_digits=True)
def reshape_emnist_element(element):
return (tf.expand_dims(element['pixels'], axis=-1), element['label'])
def preprocess_train_dataset(dataset):
"""Preprocessing function for the EMNIST training dataset."""
return (dataset
# Shuffle according to the largest client dataset
.shuffle(buffer_size=MAX_CLIENT_DATASET_SIZE)
# Repeat to do multiple local epochs
.repeat(CLIENT_EPOCHS_PER_ROUND)
# Batch to a fixed client batch size
.batch(CLIENT_BATCH_SIZE, drop_remainder=False)
# Preprocessing step
.map(reshape_emnist_element))
emnist_train = emnist_train.preprocess(preprocess_train_dataset)
모델 정의하기
여기에서는 원래 FedAvg CNN을 기반으로 keras 모델을 정의한 다음 TFF에서 사용할 수 있도록 tff.learning.Model 인스턴스에 keras 모델을 래핑합니다.
단순히 모델을 직접 생성하는 대신 모델을 생성하는 함수가 필요합니다. 또한, 이 함수는 미리 구성된 모델을 캡처할 수 없습니다. 함수가 호출된 컨텍스트에서 모델을 만들어야 합니다. 그 이유는 TFF는 기기로 이동하도록 설계되었으며 리소스를 캡처하고 패키징할 수 있도록 리소스의 구성 시기를 제어해야 하기 때문입니다.
def create_original_fedavg_cnn_model(only_digits=True):
"""The CNN model used in https://arxiv.org/abs/1602.05629."""
data_format = 'channels_last'
max_pool = functools.partial(
tf.keras.layers.MaxPooling2D,
pool_size=(2, 2),
padding='same',
data_format=data_format)
conv2d = functools.partial(
tf.keras.layers.Conv2D,
kernel_size=5,
padding='same',
data_format=data_format,
activation=tf.nn.relu)
model = tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
conv2d(filters=32),
max_pool(),
conv2d(filters=64),
max_pool(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(512, activation=tf.nn.relu),
tf.keras.layers.Dense(10 if only_digits else 62),
tf.keras.layers.Softmax(),
])
return model
# Gets the type information of the input data. TFF is a strongly typed
# functional programming framework, and needs type information about inputs to
# the model.
input_spec = emnist_train.create_tf_dataset_for_client(
emnist_train.client_ids[0]).element_spec
def tff_model_fn():
keras_model = create_original_fedavg_cnn_model()
return tff.learning.from_keras_model(
keras_model=keras_model,
input_spec=input_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
모델 훈련 및 훈련 메트릭 출력
이제 Federated Averaging 알고리즘을 구성하고 EMNIST 데이터세트에 대해 정의된 모델을 훈련할 준비가 되었습니다.
먼저 tff.learning.build_federated_averaging_process API를 사용하여 Federated Averaging 알고리즘을 빌드해야 합니다.
federated_averaging = tff.learning.build_federated_averaging_process(
model_fn=tff_model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))
이제 Federated Averaging 알고리즘을 실행해 보겠습니다. TFF의 관점에서 Federated Learning 알고리즘의 실행은 다음과 같습니다.
- 알고리즘을 초기화하고 초기 서버 상태를 가져옵니다. 서버 상태에는 알고리즘을 수행하는 데 필요한 정보가 포함되어 있습니다. TFF는 함수형이므로 이 상태에는 알고리즘이 사용하는 모든 최적화 상태(예: momentum 조건)와 모델 매개변수 자체가 모두 포함됩니다. 이 상태는 인수로 전달되고 TFF 계산의 결과로 반환됩니다.
- 알고리즘을 라운드별로 실행합니다. 각 라운드에서 각 클라이언트가 데이터에 대해 모델을 훈련한 결과로서 새로운 서버 상태가 반환됩니다. 일반적으로 한 라운드에서 다음과 같습니다.
- 서버는 모든 참여 클라이언트로 모델을 브로드캐스팅합니다.
- 각 클라이언트는 모델과 자체 데이터를 기반으로 작업을 수행합니다.
- 서버는 모든 모델을 집계하여 새 모델을 포함하는 서버 상태를 생성합니다.
자세한 내용은 사용자 정의 Federated Algorithm, 2부: Federated Averaging 구현하기 튜토리얼을 참조하세요.
훈련 메트릭은 훈련 후 표시하기 위해 Tensorboard 디렉토리에 기록됩니다.
Load utility functions
def format_size(size):
"""A helper function for creating a human-readable size."""
size = float(size)
for unit in ['B','KiB','MiB','GiB']:
if size < 1024.0:
return "{size:3.2f}{unit}".format(size=size, unit=unit)
size /= 1024.0
return "{size:.2f}{unit}".format(size=size, unit='TiB')
def set_sizing_environment():
"""Creates an environment that contains sizing information."""
# Creates a sizing executor factory to output communication cost
# after the training finishes. Note that sizing executor only provides an
# estimate (not exact) of communication cost, and doesn't capture cases like
# compression of over-the-wire representations. However, it's perfect for
# demonstrating the effect of compression in this tutorial.
sizing_factory = tff.framework.sizing_executor_factory()
# TFF has a modular runtime you can configure yourself for various
# environments and purposes, and this example just shows how to configure one
# part of it to report the size of things.
context = tff.framework.ExecutionContext(executor_fn=sizing_factory)
tff.framework.set_default_context(context)
return sizing_factory
def train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
"""Trains the federated averaging process and output metrics."""
# Create a environment to get communication cost.
environment = set_sizing_environment()
# Initialize the Federated Averaging algorithm to get the initial server state.
state = federated_averaging_process.initialize()
with summary_writer.as_default():
for round_num in range(num_rounds):
# Sample the clients parcitipated in this round.
sampled_clients = np.random.choice(
emnist_train.client_ids,
size=num_clients_per_round,
replace=False)
# Create a list of `tf.Dataset` instances from the data of sampled clients.
sampled_train_data = [
emnist_train.create_tf_dataset_for_client(client)
for client in sampled_clients
]
# Round one round of the algorithm based on the server state and client data
# and output the new state and metrics.
state, metrics = federated_averaging_process.next(state, sampled_train_data)
# For more about size_info, please see https://www.tensorflow.org/federated/api_docs/python/tff/framework/SizeInfo
size_info = environment.get_size_info()
broadcasted_bits = size_info.broadcast_bits[-1]
aggregated_bits = size_info.aggregate_bits[-1]
print('round {:2d}, metrics={}, broadcasted_bits={}, aggregated_bits={}'.format(round_num, metrics, format_size(broadcasted_bits), format_size(aggregated_bits)))
# Add metrics to Tensorboard.
for name, value in metrics['train']._asdict().items():
tf.summary.scalar(name, value, step=round_num)
# Add broadcasted and aggregated data size to Tensorboard.
tf.summary.scalar('cumulative_broadcasted_bits', broadcasted_bits, step=round_num)
tf.summary.scalar('cumulative_aggregated_bits', aggregated_bits, step=round_num)
summary_writer.flush()
# Clean the log directory to avoid conflicts.
!rm -R /tmp/logs/scalars/*
# Set up the log directory and writer for Tensorboard.
logdir = "/tmp/logs/scalars/original/"
summary_writer = tf.summary.create_file_writer(logdir)
train(federated_averaging_process=federated_averaging, num_rounds=10,
num_clients_per_round=10, summary_writer=summary_writer)
round 0, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.09433962404727936,loss=2.3181073665618896>>, broadcasted_bits=507.62MiB, aggregated_bits=507.62MiB round 1, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.0765027329325676,loss=2.3148586750030518>>, broadcasted_bits=1015.24MiB, aggregated_bits=1015.24MiB round 2, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.08872458338737488,loss=2.3089394569396973>>, broadcasted_bits=1.49GiB, aggregated_bits=1.49GiB round 3, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.10852713137865067,loss=2.304060220718384>>, broadcasted_bits=1.98GiB, aggregated_bits=1.98GiB round 4, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.10818713158369064,loss=2.3026843070983887>>, broadcasted_bits=2.48GiB, aggregated_bits=2.48GiB round 5, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.10454985499382019,loss=2.300365447998047>>, broadcasted_bits=2.97GiB, aggregated_bits=2.97GiB round 6, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.12841254472732544,loss=2.29765248298645>>, broadcasted_bits=3.47GiB, aggregated_bits=3.47GiB round 7, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.14023210108280182,loss=2.2977216243743896>>, broadcasted_bits=3.97GiB, aggregated_bits=3.97GiB round 8, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.15060241520404816,loss=2.29490327835083>>, broadcasted_bits=4.46GiB, aggregated_bits=4.46GiB round 9, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.13088512420654297,loss=2.2942349910736084>>, broadcasted_bits=4.96GiB, aggregated_bits=4.96GiB
위에서 지정한 루트 로그 디렉터리로 TensorBoard를 시작하여 훈련 메트릭을 표시합니다. 데이터를 로드하는 데 몇 초 정도 걸릴 수 있습니다. 손실 및 정확성을 제외하고는 브로트캐스팅 및 집계된 데이터의 양도 출력합니다. 브로드캐스팅된 데이터는 서버가 각 클라이언트에 푸시하는 텐서를 의미하며, 집계 데이터는 각 클라이언트가 서버에 반환하는 텐서를 의미합니다.
%tensorboard --logdir /tmp/logs/scalars/ --port=0
사용자 정의 브로드캐스트 및 집계 함수 빌드하기
이제 tensor_encoding API를 사용하여 브로드캐스팅된 데이터와 집계된 데이터에 손실 압축 알고리즘을 사용하는 함수를 구현해 보겠습니다.
먼저 두 가지 함수를 정의합니다.
broadcast_encoder_fn
은 te.core.SimpleEncoder의 인스턴스를 생성하여 서버에서 클라이언트로의 통신(브로드캐스트 데이터)에서 텐서 또는 변수를 인코딩합니다.mean_encoder_fn
은 te.core.GatherEncoder의 인스턴스를 생성하여 클라이언트에서 서버로의 통신(집계 데이터)에서 텐서 또는 변수를 인코딩합니다.
한 번에 전체 모델에 압축 메서드를 적용하지 않는다는 점에 유의해야 합니다. 대신 모델의 각 변수를 독립적으로 압축하는 방법(및 압축 여부)을 결정합니다. 그 이유는 일반적으로 바이어스와 같은 작은 변수는 부정확성에 더 민감하고 상대적으로 작기 때문에 잠재적인 통신 비용 절감도 상대적으로 적기 때문입니다. 따라서 기본적으로 작은 변수는 압축하지 않습니다. 이 예제에서는 요소가 10000개 이상인 모든 변수에 대해 8bit(256개 버킷)에 균일한 양자화를 적용하고 다른 변수에만 ID를 적용합니다.
def broadcast_encoder_fn(value):
"""Function for building encoded broadcast."""
spec = tf.TensorSpec(value.shape, value.dtype)
if value.shape.num_elements() > 10000:
return te.encoders.as_simple_encoder(
te.encoders.uniform_quantization(bits=8), spec)
else:
return te.encoders.as_simple_encoder(te.encoders.identity(), spec)
def mean_encoder_fn(value):
"""Function for building encoded mean."""
spec = tf.TensorSpec(value.shape, value.dtype)
if value.shape.num_elements() > 10000:
return te.encoders.as_gather_encoder(
te.encoders.uniform_quantization(bits=8), spec)
else:
return te.encoders.as_gather_encoder(te.encoders.identity(), spec)
TFF는 encoder 함수를 tff.learning.build_federated_averaging_process
API가 사용할 수 있는 형식으로 변환하는 API를 제공합니다. tff.learning.framework.build_encoded_broadcast_from_model
및 tff.learning.framework.build_encoded_mean_from_model
을 사용하여 tff.learning.build_federated_averaging_process
의 broadcast_process
및 aggregation_process
인수로 전달될 수 있는 두 개의 함수를 생성하여 손실 압축 알고리즘으로 Federated Averaging 알고리즘을 만듭니다.
encoded_broadcast_process = (
tff.learning.framework.build_encoded_broadcast_process_from_model(
tff_model_fn, broadcast_encoder_fn))
encoded_mean_process = (
tff.learning.framework.build_encoded_mean_process_from_model(
tff_model_fn, mean_encoder_fn))
federated_averaging_with_compression = tff.learning.build_federated_averaging_process(
tff_model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),
broadcast_process=encoded_broadcast_process,
aggregation_process=encoded_mean_process)
모델 다시 훈련하기
이제 새로운 Federated Averaging 알고리즘을 실행해 보겠습니다.
logdir_for_compression = "/tmp/logs/scalars/compression/"
summary_writer_for_compression = tf.summary.create_file_writer(
logdir_for_compression)
train(federated_averaging_process=federated_averaging_with_compression,
num_rounds=10,
num_clients_per_round=10,
summary_writer=summary_writer_for_compression)
round 0, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.08722109347581863,loss=2.3216357231140137>>, broadcasted_bits=146.46MiB, aggregated_bits=146.46MiB round 1, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.08379272371530533,loss=2.3108291625976562>>, broadcasted_bits=292.92MiB, aggregated_bits=292.92MiB round 2, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.08834951370954514,loss=2.3074147701263428>>, broadcasted_bits=439.38MiB, aggregated_bits=439.39MiB round 3, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.10467479377985,loss=2.305814027786255>>, broadcasted_bits=585.84MiB, aggregated_bits=585.85MiB round 4, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.09853658825159073,loss=2.3012874126434326>>, broadcasted_bits=732.30MiB, aggregated_bits=732.31MiB round 5, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.14904330670833588,loss=2.3005223274230957>>, broadcasted_bits=878.77MiB, aggregated_bits=878.77MiB round 6, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.13152804970741272,loss=2.2985599040985107>>, broadcasted_bits=1.00GiB, aggregated_bits=1.00GiB round 7, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.12392637878656387,loss=2.297102451324463>>, broadcasted_bits=1.14GiB, aggregated_bits=1.14GiB round 8, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.13289350271224976,loss=2.2944107055664062>>, broadcasted_bits=1.29GiB, aggregated_bits=1.29GiB round 9, metrics=<broadcast=<>,aggregation=<>,train=<sparse_categorical_accuracy=0.12661737203598022,loss=2.2971296310424805>>, broadcasted_bits=1.43GiB, aggregated_bits=1.43GiB
TensorBoard를 다시 시작하여 두 실행 간의 훈련 메트릭을 비교합니다.
Tensorboard에서 볼 수 있듯이, broadcasted_bits
및 aggregated_bits
플롯에서 orginial
및 compression
곡선 간에 상당한 감소가 있는 반면 loss
및 sparse_categorical_accuracy
플롯에서는 두 곡선이 매우 유사합니다.
결론적으로, 원래의 Federated Averaging 알고리즘과 유사한 성능을 달성 할 수 있는 압축 알고리즘을 구현하면서 통신 비용은 대폭 절감했습니다.
%tensorboard --logdir /tmp/logs/scalars/ --port=0
연습
사용자 정의 압축 알고리즘을 구현하고 훈련 루프에 적용하기 위해 다음을 수행할 수 있습니다.
- 이 예제에 따라
EncodingStageInterface
의 서브 클래스 또는 보다 일반적인 변형인AdaptiveEncodingStageInterface
로 새 압축 알고리즘을 구현합니다. - 새
Encoder
를 구성하고 모델 브로드캐스팅 또는 모델 업데이트 평균화를 위해 특수화합니다. - 이들 객체를 사용하여 전체 훈련 계산을 빌드합니다.
잠재적으로 가치 있는 개방형 연구 질문에는 불균일 양자화, 허프만 코딩과 같은 무손실 압축, 이전 훈련 라운드의 정보를 기반으로 한 압축을 조정하는 메커니즘이 포함됩니다.
권장 참고 자료:
- 클라이언트 리소스 요구 사항을 줄여 페더레이션 된 학습 범위 확장
- 연합 학습 : 커뮤니케이션 효율성 향상을위한 전략
- Advanced and Open Problems in Federated Learning의 Section 3.5 Communication and Compression