Ver no TensorFlow.org | Executar no Google Colab | Ver fonte no GitHub | Baixar caderno |
Neste tutorial, vamos usar o EMNIST conjunto de dados para demonstrar como habilitar algoritmos de compressão com perdas para reduzir o custo de comunicação no algoritmo Média Federated usando o tff.learning.build_federated_averaging_process
API eo tensor_encoding API. Para mais detalhes sobre o algoritmo Média Federados, ver o papel de Aprendizagem Comunicação Eficiente de profunda Redes de Descentralizada de Dados .
Antes de começarmos
Antes de começar, execute o seguinte para se certificar de que seu ambiente está configurado corretamente. Se você não vê uma saudação, consulte a instalação guia para obter instruções.
!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade tensorflow-model-optimization
!pip install --quiet --upgrade nest-asyncio
import nest_asyncio
nest_asyncio.apply()
%load_ext tensorboard
import functools
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
from tensorflow_model_optimization.python.core.internal import tensor_encoding as te
Verifique se o TFF está funcionando.
@tff.federated_computation
def hello_world():
return 'Hello, World!'
hello_world()
b'Hello, World!'
Preparando os dados de entrada
Nesta seção, carregamos e pré-processamos o conjunto de dados EMNIST incluído no TFF. Por favor, confira Federated Aprendizagem para Imagem Classification tutorial para mais detalhes sobre EMNIST conjunto de dados.
# This value only applies to EMNIST dataset, consider choosing appropriate
# values if switching to other datasets.
MAX_CLIENT_DATASET_SIZE = 418
CLIENT_EPOCHS_PER_ROUND = 1
CLIENT_BATCH_SIZE = 20
TEST_BATCH_SIZE = 500
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(
only_digits=True)
def reshape_emnist_element(element):
return (tf.expand_dims(element['pixels'], axis=-1), element['label'])
def preprocess_train_dataset(dataset):
"""Preprocessing function for the EMNIST training dataset."""
return (dataset
# Shuffle according to the largest client dataset
.shuffle(buffer_size=MAX_CLIENT_DATASET_SIZE)
# Repeat to do multiple local epochs
.repeat(CLIENT_EPOCHS_PER_ROUND)
# Batch to a fixed client batch size
.batch(CLIENT_BATCH_SIZE, drop_remainder=False)
# Preprocessing step
.map(reshape_emnist_element))
emnist_train = emnist_train.preprocess(preprocess_train_dataset)
Definindo um modelo
Aqui nós definimos um modelo keras baseado no orginial FedAvg CNN, e em seguida, enrole o modelo keras em uma instância do tff.learning.Model para que ele possa ser consumido por TFF.
Note que vamos precisar de uma função que produz um modelo em vez de simplesmente um modelo diretamente. Além disso, a função não pode simplesmente capturar um modelo pré-construído, ele deve criar o modelo no contexto de que ele é chamado. O motivo é que o TFF foi projetado para ir para dispositivos e precisa de controle sobre quando os recursos são construídos para que possam ser capturados e empacotados.
def create_original_fedavg_cnn_model(only_digits=True):
"""The CNN model used in https://arxiv.org/abs/1602.05629."""
data_format = 'channels_last'
max_pool = functools.partial(
tf.keras.layers.MaxPooling2D,
pool_size=(2, 2),
padding='same',
data_format=data_format)
conv2d = functools.partial(
tf.keras.layers.Conv2D,
kernel_size=5,
padding='same',
data_format=data_format,
activation=tf.nn.relu)
model = tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
conv2d(filters=32),
max_pool(),
conv2d(filters=64),
max_pool(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(512, activation=tf.nn.relu),
tf.keras.layers.Dense(10 if only_digits else 62),
tf.keras.layers.Softmax(),
])
return model
# Gets the type information of the input data. TFF is a strongly typed
# functional programming framework, and needs type information about inputs to
# the model.
input_spec = emnist_train.create_tf_dataset_for_client(
emnist_train.client_ids[0]).element_spec
def tff_model_fn():
keras_model = create_original_fedavg_cnn_model()
return tff.learning.from_keras_model(
keras_model=keras_model,
input_spec=input_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
Treinar o modelo e gerar métricas de treinamento
Agora estamos prontos para construir um algoritmo de Média Federada e treinar o modelo definido no conjunto de dados EMNIST.
Primeiro, precisamos construir um algoritmo Média Federated usando o tff.learning.build_federated_averaging_process API.
federated_averaging = tff.learning.build_federated_averaging_process(
model_fn=tff_model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))
Agora vamos executar o algoritmo de Média Federada. A execução de um algoritmo de Aprendizado Federado da perspectiva do TFF é assim:
- Inicialize o algoritmo e obtenha o estado inicial do servidor. O estado do servidor contém as informações necessárias para executar o algoritmo. Lembre-se, uma vez que o TFF é funcional, que este estado inclui qualquer estado do otimizador que o algoritmo usa (por exemplo, termos de momentum), bem como os próprios parâmetros do modelo - estes serão passados como argumentos e retornados como resultados de cálculos de TFF.
- Execute o algoritmo rodada a rodada. Em cada rodada, um novo estado do servidor será retornado como resultado de cada cliente treinando o modelo em seus dados. Normalmente em uma rodada:
- O servidor transmite o modelo para todos os clientes participantes.
- Cada cliente realiza um trabalho com base no modelo e em seus próprios dados.
- O servidor agrega todo o modelo para produzir um estado de servidor que contém um novo modelo.
Para mais detalhes, consulte personalizado Federados Algoritmos, Part 2: Implementando Federated Média tutorial.
As métricas de treinamento são gravadas no diretório Tensorboard para exibição após o treinamento.
Carregar funções utilitárias
def format_size(size):
"""A helper function for creating a human-readable size."""
size = float(size)
for unit in ['bit','Kibit','Mibit','Gibit']:
if size < 1024.0:
return "{size:3.2f}{unit}".format(size=size, unit=unit)
size /= 1024.0
return "{size:.2f}{unit}".format(size=size, unit='TiB')
def set_sizing_environment():
"""Creates an environment that contains sizing information."""
# Creates a sizing executor factory to output communication cost
# after the training finishes. Note that sizing executor only provides an
# estimate (not exact) of communication cost, and doesn't capture cases like
# compression of over-the-wire representations. However, it's perfect for
# demonstrating the effect of compression in this tutorial.
sizing_factory = tff.framework.sizing_executor_factory()
# TFF has a modular runtime you can configure yourself for various
# environments and purposes, and this example just shows how to configure one
# part of it to report the size of things.
context = tff.framework.ExecutionContext(executor_fn=sizing_factory)
tff.framework.set_default_context(context)
return sizing_factory
def train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
"""Trains the federated averaging process and output metrics."""
# Create a environment to get communication cost.
environment = set_sizing_environment()
# Initialize the Federated Averaging algorithm to get the initial server state.
state = federated_averaging_process.initialize()
with summary_writer.as_default():
for round_num in range(num_rounds):
# Sample the clients parcitipated in this round.
sampled_clients = np.random.choice(
emnist_train.client_ids,
size=num_clients_per_round,
replace=False)
# Create a list of `tf.Dataset` instances from the data of sampled clients.
sampled_train_data = [
emnist_train.create_tf_dataset_for_client(client)
for client in sampled_clients
]
# Round one round of the algorithm based on the server state and client data
# and output the new state and metrics.
state, metrics = federated_averaging_process.next(state, sampled_train_data)
# For more about size_info, please see https://www.tensorflow.org/federated/api_docs/python/tff/framework/SizeInfo
size_info = environment.get_size_info()
broadcasted_bits = size_info.broadcast_bits[-1]
aggregated_bits = size_info.aggregate_bits[-1]
print('round {:2d}, metrics={}, broadcasted_bits={}, aggregated_bits={}'.format(round_num, metrics, format_size(broadcasted_bits), format_size(aggregated_bits)))
# Add metrics to Tensorboard.
for name, value in metrics['train'].items():
tf.summary.scalar(name, value, step=round_num)
# Add broadcasted and aggregated data size to Tensorboard.
tf.summary.scalar('cumulative_broadcasted_bits', broadcasted_bits, step=round_num)
tf.summary.scalar('cumulative_aggregated_bits', aggregated_bits, step=round_num)
summary_writer.flush()
# Clean the log directory to avoid conflicts.
try:
tf.io.gfile.rmtree('/tmp/logs/scalars')
except tf.errors.OpError as e:
pass # Path doesn't exist
# Set up the log directory and writer for Tensorboard.
logdir = "/tmp/logs/scalars/original/"
summary_writer = tf.summary.create_file_writer(logdir)
train(federated_averaging_process=federated_averaging, num_rounds=10,
num_clients_per_round=10, summary_writer=summary_writer)
round 0, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.07383774), ('loss', 2.3276227)])), ('stat', OrderedDict([('num_examples', 1097)]))]), broadcasted_bits=507.62Mibit, aggregated_bits=507.62Mibit round 1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.099585064), ('loss', 2.3152695)])), ('stat', OrderedDict([('num_examples', 964)]))]), broadcasted_bits=1015.24Mibit, aggregated_bits=1015.24Mibit round 2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09760766), ('loss', 2.3077576)])), ('stat', OrderedDict([('num_examples', 1045)]))]), broadcasted_bits=1.49Gibit, aggregated_bits=1.49Gibit round 3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.0963035), ('loss', 2.3066626)])), ('stat', OrderedDict([('num_examples', 1028)]))]), broadcasted_bits=1.98Gibit, aggregated_bits=1.98Gibit round 4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.10694184), ('loss', 2.3033001)])), ('stat', OrderedDict([('num_examples', 1066)]))]), broadcasted_bits=2.48Gibit, aggregated_bits=2.48Gibit round 5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.1185567), ('loss', 2.2999184)])), ('stat', OrderedDict([('num_examples', 970)]))]), broadcasted_bits=2.97Gibit, aggregated_bits=2.97Gibit round 6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.11751663), ('loss', 2.296883)])), ('stat', OrderedDict([('num_examples', 902)]))]), broadcasted_bits=3.47Gibit, aggregated_bits=3.47Gibit round 7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13063477), ('loss', 2.2990246)])), ('stat', OrderedDict([('num_examples', 1087)]))]), broadcasted_bits=3.97Gibit, aggregated_bits=3.97Gibit round 8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12742382), ('loss', 2.2971866)])), ('stat', OrderedDict([('num_examples', 1083)]))]), broadcasted_bits=4.46Gibit, aggregated_bits=4.46Gibit round 9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13555992), ('loss', 2.2934425)])), ('stat', OrderedDict([('num_examples', 1018)]))]), broadcasted_bits=4.96Gibit, aggregated_bits=4.96Gibit
Inicie o TensorBoard com o diretório de registro raiz especificado acima para exibir as métricas de treinamento. O carregamento dos dados pode demorar alguns segundos. Exceto para perda e precisão, também geramos a quantidade de dados transmitidos e agregados. Os dados transmitidos se referem aos tensores que o servidor envia para cada cliente, enquanto os dados agregados se referem aos tensores que cada cliente retorna ao servidor.
%tensorboard --logdir /tmp/logs/scalars/ --port=0
Launching TensorBoard... Reusing TensorBoard on port 34445 (pid 579503), started 1:53:14 ago. (Use '!kill 579503' to kill it.) <IPython.core.display.Javascript at 0x7f9135ef1630>
Crie uma transmissão personalizada e função agregada
Agora vamos implementar a função de utilizar algoritmos de compressão com perdas sobre os dados transmitidos e os dados agregados usando o tensor_encoding API.
Primeiro, definimos duas funções:
-
broadcast_encoder_fn
que cria uma instância de te.core.SimpleEncoder para tensores codificar ou variáveis no servidor para comunicação do cliente (dados Broadcast). -
mean_encoder_fn
que cria uma instância de te.core.GatherEncoder para tensores codificar ou variáveis no cliente para communicaiton servidor (dados de agregação).
É importante observar que não aplicamos um método de compactação a todo o modelo de uma vez. Em vez disso, decidimos como (e se) comprimir cada variável do modelo de forma independente. A razão é que, geralmente, pequenas variáveis, como vieses, são mais sensíveis à imprecisão e, sendo relativamente pequenas, a economia potencial de comunicação também é relativamente pequena. Portanto, não compactamos pequenas variáveis por padrão. Neste exemplo, aplicamos quantização uniforme a 8 bits (256 buckets) para cada variável com mais de 10.000 elementos e apenas aplicamos identidade a outras variáveis.
def broadcast_encoder_fn(value):
"""Function for building encoded broadcast."""
spec = tf.TensorSpec(value.shape, value.dtype)
if value.shape.num_elements() > 10000:
return te.encoders.as_simple_encoder(
te.encoders.uniform_quantization(bits=8), spec)
else:
return te.encoders.as_simple_encoder(te.encoders.identity(), spec)
def mean_encoder_fn(tensor_spec):
"""Function for building a GatherEncoder."""
spec = tf.TensorSpec(tensor_spec.shape, tensor_spec.dtype)
if tensor_spec.shape.num_elements() > 10000:
return te.encoders.as_gather_encoder(
te.encoders.uniform_quantization(bits=8), spec)
else:
return te.encoders.as_gather_encoder(te.encoders.identity(), spec)
TFF fornece APIs para converter a função de codificador em um formato que tff.learning.build_federated_averaging_process
API pode consumir. Ao utilizar o tff.learning.framework.build_encoded_broadcast_from_model
e tff.aggregators.MeanFactory
, podemos criar dois objetos que podem ser passados para broadcast_process
e model_update_aggregation_factory
agruments de tff.learning.build_federated_averaging_process
para criar um federados de Média algoritmos com um algoritmo de compressão lossy.
encoded_broadcast_process = (
tff.learning.framework.build_encoded_broadcast_process_from_model(
tff_model_fn, broadcast_encoder_fn))
mean_factory = tff.aggregators.MeanFactory(
tff.aggregators.EncodedSumFactory(mean_encoder_fn), # numerator
tff.aggregators.EncodedSumFactory(mean_encoder_fn), # denominator
)
federated_averaging_with_compression = tff.learning.build_federated_averaging_process(
tff_model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),
broadcast_process=encoded_broadcast_process,
model_update_aggregation_factory=mean_factory)
Treinando o modelo novamente
Agora vamos executar o novo algoritmo de Média Federada.
logdir_for_compression = "/tmp/logs/scalars/compression/"
summary_writer_for_compression = tf.summary.create_file_writer(
logdir_for_compression)
train(federated_averaging_process=federated_averaging_with_compression,
num_rounds=10,
num_clients_per_round=10,
summary_writer=summary_writer_for_compression)
round 0, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.093), ('loss', 2.3194966)])), ('stat', OrderedDict([('num_examples', 1000)]))]), broadcasted_bits=146.46Mibit, aggregated_bits=146.46Mibit round 1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.10432034), ('loss', 2.3079953)])), ('stat', OrderedDict([('num_examples', 949)]))]), broadcasted_bits=292.92Mibit, aggregated_bits=292.93Mibit round 2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.07886754), ('loss', 2.3101337)])), ('stat', OrderedDict([('num_examples', 989)]))]), broadcasted_bits=439.38Mibit, aggregated_bits=439.39Mibit round 3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09774436), ('loss', 2.305069)])), ('stat', OrderedDict([('num_examples', 1064)]))]), broadcasted_bits=585.84Mibit, aggregated_bits=585.85Mibit round 4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09404097), ('loss', 2.302943)])), ('stat', OrderedDict([('num_examples', 1074)]))]), broadcasted_bits=732.30Mibit, aggregated_bits=732.32Mibit round 5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09), ('loss', 2.304385)])), ('stat', OrderedDict([('num_examples', 1000)]))]), broadcasted_bits=878.77Mibit, aggregated_bits=878.78Mibit round 6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.14368932), ('loss', 2.2973824)])), ('stat', OrderedDict([('num_examples', 1030)]))]), broadcasted_bits=1.00Gibit, aggregated_bits=1.00Gibit round 7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12140871), ('loss', 2.2993405)])), ('stat', OrderedDict([('num_examples', 1079)]))]), broadcasted_bits=1.14Gibit, aggregated_bits=1.14Gibit round 8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13600783), ('loss', 2.2953267)])), ('stat', OrderedDict([('num_examples', 1022)]))]), broadcasted_bits=1.29Gibit, aggregated_bits=1.29Gibit round 9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13844621), ('loss', 2.295768)])), ('stat', OrderedDict([('num_examples', 1004)]))]), broadcasted_bits=1.43Gibit, aggregated_bits=1.43Gibit
Inicie o TensorBoard novamente para comparar as métricas de treinamento entre duas execuções.
Como você pode ver na Tensorboard, há uma redução significativa entre os orginial
e compression
curvas nos broadcasted_bits
e aggregated_bits
parcelas enquanto na loss
e sparse_categorical_accuracy
trama as duas curvas são bastante semelhante.
Em conclusão, implementamos um algoritmo de compressão que pode atingir desempenho semelhante ao algoritmo de Média Federada original enquanto o custo de cominucação é significativamente reduzido.
%tensorboard --logdir /tmp/logs/scalars/ --port=0
Launching TensorBoard... Reusing TensorBoard on port 34445 (pid 579503), started 1:54:12 ago. (Use '!kill 579503' to kill it.) <IPython.core.display.Javascript at 0x7f9140eb5ef0>
Exercícios
Para implementar um algoritmo de compressão personalizado e aplicá-lo ao loop de treinamento, você pode:
- Implementar um novo algoritmo de compressão tal como uma subclasse de
EncodingStageInterface
ou a sua variante mais geral,AdaptiveEncodingStageInterface
seguinte deste exemplo . - Construa o seu novo
Encoder
e especializar-lo para modelo de transmissão ou modelo de atualização média . - Use esses objetos para construir toda a computação formação .
As perguntas abertas de pesquisa potencialmente valiosas incluem: quantização não uniforme, compressão sem perdas, como codificação huffman, e mecanismos para adaptar a compressão com base nas informações de rodadas de treinamento anteriores.
Materiais de leitura recomendados: