Zobacz na TensorFlow.org | Uruchom w Google Colab | Wyświetl źródło na GitHub | Pobierz notatnik |
W tym tutorialu użyjemy EMNIST zestaw danych w celu wykazania, jak włączyć algorytmy kompresji stratnej, aby obniżyć koszty komunikacji w algorytmie uśredniania Federalne używając tff.learning.build_federated_averaging_process
API i tensor_encoding API. Więcej informacji na temat algorytmu uśredniania Federalne, zobacz papieru Communication-Efektywne uczenie głębokiej sieci od zdecentralizowanej Danych .
Zanim zaczniemy
Zanim zaczniemy, wykonaj następujące czynności, aby upewnić się, że Twoje środowisko jest poprawnie skonfigurowane. Jeśli nie pojawi się powitanie, proszę odnieść się do montażu prowadnicy do instrukcji.
!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade tensorflow-model-optimization
!pip install --quiet --upgrade nest-asyncio
import nest_asyncio
nest_asyncio.apply()
%load_ext tensorboard
import functools
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
from tensorflow_model_optimization.python.core.internal import tensor_encoding as te
Sprawdź, czy działa TFF.
@tff.federated_computation
def hello_world():
return 'Hello, World!'
hello_world()
b'Hello, World!'
Przygotowanie danych wejściowych
W tej sekcji ładujemy i wstępnie przetwarzamy zbiór danych EMNIST zawarty w TFF. Proszę sprawdzić Federalne nauki dla klasyfikowania Obraz samouczek dla więcej szczegółów na temat EMNIST zbiorze.
# This value only applies to EMNIST dataset, consider choosing appropriate
# values if switching to other datasets.
MAX_CLIENT_DATASET_SIZE = 418
CLIENT_EPOCHS_PER_ROUND = 1
CLIENT_BATCH_SIZE = 20
TEST_BATCH_SIZE = 500
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(
only_digits=True)
def reshape_emnist_element(element):
return (tf.expand_dims(element['pixels'], axis=-1), element['label'])
def preprocess_train_dataset(dataset):
"""Preprocessing function for the EMNIST training dataset."""
return (dataset
# Shuffle according to the largest client dataset
.shuffle(buffer_size=MAX_CLIENT_DATASET_SIZE)
# Repeat to do multiple local epochs
.repeat(CLIENT_EPOCHS_PER_ROUND)
# Batch to a fixed client batch size
.batch(CLIENT_BATCH_SIZE, drop_remainder=False)
# Preprocessing step
.map(reshape_emnist_element))
emnist_train = emnist_train.preprocess(preprocess_train_dataset)
Definiowanie modelu
Tutaj definiujemy model Keras oparciu o orginial FedAvg CNN, a następnie owinąć model Keras w instancji tff.learning.Model tak, że może być spożywany przez TFF.
Należy pamiętać, że musimy funkcję, która produkuje model Zamiast prostego modelu bezpośrednio. Ponadto, funkcja nie może po prostu uchwycić wcześniej zbudowanego modelu, musi stworzyć model w kontekście faktu, że jest ona wywoływana. Powodem jest to, że TFF jest zaprojektowany tak, aby trafiał do urządzeń i wymaga kontroli nad tym, kiedy zasoby są konstruowane, aby można je było przechwycić i spakować.
def create_original_fedavg_cnn_model(only_digits=True):
"""The CNN model used in https://arxiv.org/abs/1602.05629."""
data_format = 'channels_last'
max_pool = functools.partial(
tf.keras.layers.MaxPooling2D,
pool_size=(2, 2),
padding='same',
data_format=data_format)
conv2d = functools.partial(
tf.keras.layers.Conv2D,
kernel_size=5,
padding='same',
data_format=data_format,
activation=tf.nn.relu)
model = tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
conv2d(filters=32),
max_pool(),
conv2d(filters=64),
max_pool(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(512, activation=tf.nn.relu),
tf.keras.layers.Dense(10 if only_digits else 62),
tf.keras.layers.Softmax(),
])
return model
# Gets the type information of the input data. TFF is a strongly typed
# functional programming framework, and needs type information about inputs to
# the model.
input_spec = emnist_train.create_tf_dataset_for_client(
emnist_train.client_ids[0]).element_spec
def tff_model_fn():
keras_model = create_original_fedavg_cnn_model()
return tff.learning.from_keras_model(
keras_model=keras_model,
input_spec=input_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
Uczenie modelu i wyprowadzanie metryk uczących
Teraz jesteśmy gotowi do skonstruowania algorytmu uśredniania sfederowanego i trenowania zdefiniowanego modelu na zbiorze danych EMNIST.
Najpierw musimy zbudować algorytm uśredniania Federalne pomocą tff.learning.build_federated_averaging_process API.
federated_averaging = tff.learning.build_federated_averaging_process(
model_fn=tff_model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))
Teraz uruchommy algorytm uśredniania sfederowanego. Wykonanie algorytmu Federated Learning z perspektywy TFF wygląda tak:
- Zainicjuj algorytm i uzyskaj początkowy stan serwera. Stan serwera zawiera niezbędne informacje do wykonania algorytmu. Przypomnijmy, że ponieważ TFF działa, stan ten obejmuje zarówno dowolny stan optymalizatora używany przez algorytm (np. warunki pędu), jak i same parametry modelu — zostaną one przekazane jako argumenty i zwrócone jako wyniki obliczeń TFF.
- Wykonaj algorytm runda po rundzie. W każdej rundzie nowy stan serwera zostanie zwrócony w wyniku uczenia modelu przez każdego klienta na jego danych. Zazwyczaj w jednej rundzie:
- Serwer rozgłasza model do wszystkich uczestniczących klientów.
- Każdy klient wykonuje pracę w oparciu o model i własne dane.
- Serwer agreguje wszystkie modele, aby utworzyć stan serwera, który zawiera nowy model.
Aby uzyskać więcej informacji, proszę zobaczyć niestandardowy Federalne algorytmów, część 2: Wdrażanie Federalne uśredniania samouczek.
Metryki treningowe są zapisywane w katalogu Tensorboard w celu wyświetlenia po treningu.
Załaduj funkcje użytkowe
def format_size(size):
"""A helper function for creating a human-readable size."""
size = float(size)
for unit in ['bit','Kibit','Mibit','Gibit']:
if size < 1024.0:
return "{size:3.2f}{unit}".format(size=size, unit=unit)
size /= 1024.0
return "{size:.2f}{unit}".format(size=size, unit='TiB')
def set_sizing_environment():
"""Creates an environment that contains sizing information."""
# Creates a sizing executor factory to output communication cost
# after the training finishes. Note that sizing executor only provides an
# estimate (not exact) of communication cost, and doesn't capture cases like
# compression of over-the-wire representations. However, it's perfect for
# demonstrating the effect of compression in this tutorial.
sizing_factory = tff.framework.sizing_executor_factory()
# TFF has a modular runtime you can configure yourself for various
# environments and purposes, and this example just shows how to configure one
# part of it to report the size of things.
context = tff.framework.ExecutionContext(executor_fn=sizing_factory)
tff.framework.set_default_context(context)
return sizing_factory
def train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
"""Trains the federated averaging process and output metrics."""
# Create a environment to get communication cost.
environment = set_sizing_environment()
# Initialize the Federated Averaging algorithm to get the initial server state.
state = federated_averaging_process.initialize()
with summary_writer.as_default():
for round_num in range(num_rounds):
# Sample the clients parcitipated in this round.
sampled_clients = np.random.choice(
emnist_train.client_ids,
size=num_clients_per_round,
replace=False)
# Create a list of `tf.Dataset` instances from the data of sampled clients.
sampled_train_data = [
emnist_train.create_tf_dataset_for_client(client)
for client in sampled_clients
]
# Round one round of the algorithm based on the server state and client data
# and output the new state and metrics.
state, metrics = federated_averaging_process.next(state, sampled_train_data)
# For more about size_info, please see https://www.tensorflow.org/federated/api_docs/python/tff/framework/SizeInfo
size_info = environment.get_size_info()
broadcasted_bits = size_info.broadcast_bits[-1]
aggregated_bits = size_info.aggregate_bits[-1]
print('round {:2d}, metrics={}, broadcasted_bits={}, aggregated_bits={}'.format(round_num, metrics, format_size(broadcasted_bits), format_size(aggregated_bits)))
# Add metrics to Tensorboard.
for name, value in metrics['train'].items():
tf.summary.scalar(name, value, step=round_num)
# Add broadcasted and aggregated data size to Tensorboard.
tf.summary.scalar('cumulative_broadcasted_bits', broadcasted_bits, step=round_num)
tf.summary.scalar('cumulative_aggregated_bits', aggregated_bits, step=round_num)
summary_writer.flush()
# Clean the log directory to avoid conflicts.
try:
tf.io.gfile.rmtree('/tmp/logs/scalars')
except tf.errors.OpError as e:
pass # Path doesn't exist
# Set up the log directory and writer for Tensorboard.
logdir = "/tmp/logs/scalars/original/"
summary_writer = tf.summary.create_file_writer(logdir)
train(federated_averaging_process=federated_averaging, num_rounds=10,
num_clients_per_round=10, summary_writer=summary_writer)
round 0, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.07383774), ('loss', 2.3276227)])), ('stat', OrderedDict([('num_examples', 1097)]))]), broadcasted_bits=507.62Mibit, aggregated_bits=507.62Mibit round 1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.099585064), ('loss', 2.3152695)])), ('stat', OrderedDict([('num_examples', 964)]))]), broadcasted_bits=1015.24Mibit, aggregated_bits=1015.24Mibit round 2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09760766), ('loss', 2.3077576)])), ('stat', OrderedDict([('num_examples', 1045)]))]), broadcasted_bits=1.49Gibit, aggregated_bits=1.49Gibit round 3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.0963035), ('loss', 2.3066626)])), ('stat', OrderedDict([('num_examples', 1028)]))]), broadcasted_bits=1.98Gibit, aggregated_bits=1.98Gibit round 4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.10694184), ('loss', 2.3033001)])), ('stat', OrderedDict([('num_examples', 1066)]))]), broadcasted_bits=2.48Gibit, aggregated_bits=2.48Gibit round 5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.1185567), ('loss', 2.2999184)])), ('stat', OrderedDict([('num_examples', 970)]))]), broadcasted_bits=2.97Gibit, aggregated_bits=2.97Gibit round 6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.11751663), ('loss', 2.296883)])), ('stat', OrderedDict([('num_examples', 902)]))]), broadcasted_bits=3.47Gibit, aggregated_bits=3.47Gibit round 7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13063477), ('loss', 2.2990246)])), ('stat', OrderedDict([('num_examples', 1087)]))]), broadcasted_bits=3.97Gibit, aggregated_bits=3.97Gibit round 8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12742382), ('loss', 2.2971866)])), ('stat', OrderedDict([('num_examples', 1083)]))]), broadcasted_bits=4.46Gibit, aggregated_bits=4.46Gibit round 9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13555992), ('loss', 2.2934425)])), ('stat', OrderedDict([('num_examples', 1018)]))]), broadcasted_bits=4.96Gibit, aggregated_bits=4.96Gibit
Uruchom TensorBoard z głównym katalogiem dziennika określonym powyżej, aby wyświetlić metryki uczenia. Załadowanie danych może potrwać kilka sekund. Poza stratami i dokładnością wyprowadzamy również ilość danych nadawanych i zagregowanych. Dane rozgłaszane odnoszą się do tensorów, które serwer przesyła do każdego klienta, podczas gdy dane zagregowane odnoszą się do tensorów, które każdy klient zwraca na serwer.
%tensorboard --logdir /tmp/logs/scalars/ --port=0
Launching TensorBoard... Reusing TensorBoard on port 34445 (pid 579503), started 1:53:14 ago. (Use '!kill 579503' to kill it.) <IPython.core.display.Javascript at 0x7f9135ef1630>
Zbuduj niestandardową funkcję transmisji i agregacji
Teraz funkcję wdrożyć w użyciu stratnej algorytmy kompresji danych na temat nadawanych i sumaryczne dane z wykorzystaniem tensor_encoding API.
Najpierw definiujemy dwie funkcje:
-
broadcast_encoder_fn
który tworzy instancję te.core.SimpleEncoder do tensorów kodują lub zmiennych serwer do komunikacji klient (dane Broadcast). -
mean_encoder_fn
który tworzy instancję te.core.GatherEncoder do tensorów kodują lub zmiennych w klienta do serwera communicaiton (dane Aggregation).
Należy zauważyć, że nie stosujemy metody kompresji do całego modelu na raz. Zamiast tego decydujemy, jak (i czy) niezależnie kompresować każdą zmienną modelu. Powodem jest to, że ogólnie małe zmienne, takie jak błędy systematyczne, są bardziej wrażliwe na niedokładności, a ponieważ są stosunkowo małe, potencjalne oszczędności w zakresie komunikacji są również stosunkowo niewielkie. Dlatego domyślnie nie kompresujemy małych zmiennych. W tym przykładzie stosujemy jednolitą kwantyzację do 8 bitów (256 segmentów) do każdej zmiennej zawierającej więcej niż 10000 elementów, a identyczność stosujemy tylko do innych zmiennych.
def broadcast_encoder_fn(value):
"""Function for building encoded broadcast."""
spec = tf.TensorSpec(value.shape, value.dtype)
if value.shape.num_elements() > 10000:
return te.encoders.as_simple_encoder(
te.encoders.uniform_quantization(bits=8), spec)
else:
return te.encoders.as_simple_encoder(te.encoders.identity(), spec)
def mean_encoder_fn(tensor_spec):
"""Function for building a GatherEncoder."""
spec = tf.TensorSpec(tensor_spec.shape, tensor_spec.dtype)
if tensor_spec.shape.num_elements() > 10000:
return te.encoders.as_gather_encoder(
te.encoders.uniform_quantization(bits=8), spec)
else:
return te.encoders.as_gather_encoder(te.encoders.identity(), spec)
TFF udostępnia API do konwersji funkcję nadajnika do formatu tff.learning.build_federated_averaging_process
API może konsumować. Korzystając z tff.learning.framework.build_encoded_broadcast_from_model
i tff.aggregators.MeanFactory
, możemy stworzyć dwa obiekty, które mogą być przekazywane do broadcast_process
i model_update_aggregation_factory
agruments z tff.learning.build_federated_averaging_process
stworzyć stowarzyszonego uśredniania algorytmów z algorytmu kompresji stratnej.
encoded_broadcast_process = (
tff.learning.framework.build_encoded_broadcast_process_from_model(
tff_model_fn, broadcast_encoder_fn))
mean_factory = tff.aggregators.MeanFactory(
tff.aggregators.EncodedSumFactory(mean_encoder_fn), # numerator
tff.aggregators.EncodedSumFactory(mean_encoder_fn), # denominator
)
federated_averaging_with_compression = tff.learning.build_federated_averaging_process(
tff_model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),
broadcast_process=encoded_broadcast_process,
model_update_aggregation_factory=mean_factory)
Ponowne szkolenie modelu
Teraz uruchommy nowy algorytm uśredniania sfederowanego.
logdir_for_compression = "/tmp/logs/scalars/compression/"
summary_writer_for_compression = tf.summary.create_file_writer(
logdir_for_compression)
train(federated_averaging_process=federated_averaging_with_compression,
num_rounds=10,
num_clients_per_round=10,
summary_writer=summary_writer_for_compression)
round 0, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.093), ('loss', 2.3194966)])), ('stat', OrderedDict([('num_examples', 1000)]))]), broadcasted_bits=146.46Mibit, aggregated_bits=146.46Mibit round 1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.10432034), ('loss', 2.3079953)])), ('stat', OrderedDict([('num_examples', 949)]))]), broadcasted_bits=292.92Mibit, aggregated_bits=292.93Mibit round 2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.07886754), ('loss', 2.3101337)])), ('stat', OrderedDict([('num_examples', 989)]))]), broadcasted_bits=439.38Mibit, aggregated_bits=439.39Mibit round 3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09774436), ('loss', 2.305069)])), ('stat', OrderedDict([('num_examples', 1064)]))]), broadcasted_bits=585.84Mibit, aggregated_bits=585.85Mibit round 4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09404097), ('loss', 2.302943)])), ('stat', OrderedDict([('num_examples', 1074)]))]), broadcasted_bits=732.30Mibit, aggregated_bits=732.32Mibit round 5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09), ('loss', 2.304385)])), ('stat', OrderedDict([('num_examples', 1000)]))]), broadcasted_bits=878.77Mibit, aggregated_bits=878.78Mibit round 6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.14368932), ('loss', 2.2973824)])), ('stat', OrderedDict([('num_examples', 1030)]))]), broadcasted_bits=1.00Gibit, aggregated_bits=1.00Gibit round 7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12140871), ('loss', 2.2993405)])), ('stat', OrderedDict([('num_examples', 1079)]))]), broadcasted_bits=1.14Gibit, aggregated_bits=1.14Gibit round 8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13600783), ('loss', 2.2953267)])), ('stat', OrderedDict([('num_examples', 1022)]))]), broadcasted_bits=1.29Gibit, aggregated_bits=1.29Gibit round 9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13844621), ('loss', 2.295768)])), ('stat', OrderedDict([('num_examples', 1004)]))]), broadcasted_bits=1.43Gibit, aggregated_bits=1.43Gibit
Uruchom ponownie TensorBoard, aby porównać dane treningowe między dwoma przebiegami.
Jak widać w Tensorboard, istnieje znaczna redukcja między orginial
i compression
krzywych w broadcasted_bits
i aggregated_bits
działek aw loss
i sparse_categorical_accuracy
działce dwie krzywe są dość podobna.
Podsumowując, wdrożyliśmy algorytm kompresji, który może osiągnąć podobną wydajność jak oryginalny algorytm uśredniania sfederowanego, przy czym koszt komunikacji jest znacznie obniżony.
%tensorboard --logdir /tmp/logs/scalars/ --port=0
Launching TensorBoard... Reusing TensorBoard on port 34445 (pid 579503), started 1:54:12 ago. (Use '!kill 579503' to kill it.) <IPython.core.display.Javascript at 0x7f9140eb5ef0>
Ćwiczenia
Aby zaimplementować niestandardowy algorytm kompresji i zastosować go do pętli treningowej, możesz:
- Wdrożenie nowego algorytmu kompresji jako podklasa
EncodingStageInterface
lub jego bardziej ogólnym wariancieAdaptiveEncodingStageInterface
poniższym przykładzie . - Skonstruować nowy
Encoder
i specjalizują się go do modelu transmisji lub modelu aktualizacji uśredniania . - Korzystania z tych obiektów, aby zbudować całą obliczeń treningowy .
Potencjalnie cenne otwarte pytania badawcze obejmują: niejednolitą kwantyzację, kompresję bezstratną, taką jak kodowanie Huffmana, oraz mechanizmy adaptacji kompresji na podstawie informacji z poprzednich rund szkoleniowych.
Zalecane lektury: