Lihat di TensorFlow.org | Jalankan di Google Colab | Lihat sumber di GitHub | Unduh buku catatan |
Dalam tutorial ini, kita menggunakan EMNIST dataset untuk menunjukkan cara mengaktifkan algoritma kompresi lossy untuk mengurangi biaya komunikasi dalam algoritma Averaging Federasi menggunakan tff.learning.build_federated_averaging_process
API dan tensor_encoding API. Untuk rincian lebih lanjut tentang algoritma Averaging Federasi, lihat kertas Komunikasi-Efisien Belajar Deep Networks dari data Desentralisasi .
Sebelum kita mulai
Sebelum kita mulai, jalankan yang berikut ini untuk memastikan bahwa lingkungan Anda telah diatur dengan benar. Jika Anda tidak melihat salam, silakan merujuk ke Instalasi panduan untuk petunjuk.
!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade tensorflow-model-optimization
!pip install --quiet --upgrade nest-asyncio
import nest_asyncio
nest_asyncio.apply()
%load_ext tensorboard
import functools
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
from tensorflow_model_optimization.python.core.internal import tensor_encoding as te
Verifikasi apakah TFF berfungsi.
@tff.federated_computation
def hello_world():
return 'Hello, World!'
hello_world()
b'Hello, World!'
Menyiapkan data masukan
Di bagian ini kami memuat dan memproses dataset EMNIST yang disertakan dalam TFF. Silakan periksa Federasi Belajar untuk Gambar Klasifikasi tutorial untuk rincian lebih lanjut tentang EMNIST dataset.
# This value only applies to EMNIST dataset, consider choosing appropriate
# values if switching to other datasets.
MAX_CLIENT_DATASET_SIZE = 418
CLIENT_EPOCHS_PER_ROUND = 1
CLIENT_BATCH_SIZE = 20
TEST_BATCH_SIZE = 500
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(
only_digits=True)
def reshape_emnist_element(element):
return (tf.expand_dims(element['pixels'], axis=-1), element['label'])
def preprocess_train_dataset(dataset):
"""Preprocessing function for the EMNIST training dataset."""
return (dataset
# Shuffle according to the largest client dataset
.shuffle(buffer_size=MAX_CLIENT_DATASET_SIZE)
# Repeat to do multiple local epochs
.repeat(CLIENT_EPOCHS_PER_ROUND)
# Batch to a fixed client batch size
.batch(CLIENT_BATCH_SIZE, drop_remainder=False)
# Preprocessing step
.map(reshape_emnist_element))
emnist_train = emnist_train.preprocess(preprocess_train_dataset)
Mendefinisikan model
Di sini kita menentukan model keras berdasarkan orginial FedAvg CNN, dan kemudian membungkus model keras di sebuah contoh dari tff.learning.Model sehingga dapat dikonsumsi oleh TFF.
Perhatikan bahwa kita akan membutuhkan fungsi yang menghasilkan model bukan hanya model langsung. Selain itu, fungsi tidak bisa hanya menangkap model pra-dibangun, harus membuat model dalam konteks yang disebut. Alasannya adalah bahwa TFF dirancang untuk masuk ke perangkat, dan memerlukan kontrol saat sumber daya dibangun sehingga dapat ditangkap dan dikemas.
def create_original_fedavg_cnn_model(only_digits=True):
"""The CNN model used in https://arxiv.org/abs/1602.05629."""
data_format = 'channels_last'
max_pool = functools.partial(
tf.keras.layers.MaxPooling2D,
pool_size=(2, 2),
padding='same',
data_format=data_format)
conv2d = functools.partial(
tf.keras.layers.Conv2D,
kernel_size=5,
padding='same',
data_format=data_format,
activation=tf.nn.relu)
model = tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
conv2d(filters=32),
max_pool(),
conv2d(filters=64),
max_pool(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(512, activation=tf.nn.relu),
tf.keras.layers.Dense(10 if only_digits else 62),
tf.keras.layers.Softmax(),
])
return model
# Gets the type information of the input data. TFF is a strongly typed
# functional programming framework, and needs type information about inputs to
# the model.
input_spec = emnist_train.create_tf_dataset_for_client(
emnist_train.client_ids[0]).element_spec
def tff_model_fn():
keras_model = create_original_fedavg_cnn_model()
return tff.learning.from_keras_model(
keras_model=keras_model,
input_spec=input_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
Melatih model dan menghasilkan metrik pelatihan
Sekarang kita siap untuk membuat algoritma Federated Averaging dan melatih model yang ditentukan pada dataset EMNIST.
Pertama kita perlu membangun algoritma Averaging Federated menggunakan tff.learning.build_federated_averaging_process API.
federated_averaging = tff.learning.build_federated_averaging_process(
model_fn=tff_model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))
Sekarang mari kita jalankan algoritma Federated Averaging. Eksekusi algoritma Federated Learning dari perspektif TFF terlihat seperti ini:
- Inisialisasi algoritme dan dapatkan status server awal. Status server berisi informasi yang diperlukan untuk menjalankan algoritme. Ingat, karena TFF berfungsi, bahwa status ini mencakup status pengoptimal apa pun yang digunakan algoritme (misalnya istilah momentum) serta parameter model itu sendiri--ini akan diteruskan sebagai argumen dan dikembalikan sebagai hasil dari perhitungan TFF.
- Jalankan algoritma putaran demi putaran. Di setiap putaran, status server baru akan dikembalikan sebagai hasil dari setiap klien melatih model pada datanya. Biasanya dalam satu putaran:
- Server menyiarkan model ke semua klien yang berpartisipasi.
- Setiap klien melakukan pekerjaan berdasarkan model dan datanya sendiri.
- Server menggabungkan semua model untuk menghasilkan status pemutusan yang berisi model baru.
Untuk lebih jelasnya, silakan lihat Kustom Federasi Algoritma, Bagian 2: Pelaksana Federasi Averaging tutorial.
Metrik pelatihan ditulis ke direktori Tensorboard untuk ditampilkan setelah pelatihan.
Memuat fungsi utilitas
def format_size(size):
"""A helper function for creating a human-readable size."""
size = float(size)
for unit in ['bit','Kibit','Mibit','Gibit']:
if size < 1024.0:
return "{size:3.2f}{unit}".format(size=size, unit=unit)
size /= 1024.0
return "{size:.2f}{unit}".format(size=size, unit='TiB')
def set_sizing_environment():
"""Creates an environment that contains sizing information."""
# Creates a sizing executor factory to output communication cost
# after the training finishes. Note that sizing executor only provides an
# estimate (not exact) of communication cost, and doesn't capture cases like
# compression of over-the-wire representations. However, it's perfect for
# demonstrating the effect of compression in this tutorial.
sizing_factory = tff.framework.sizing_executor_factory()
# TFF has a modular runtime you can configure yourself for various
# environments and purposes, and this example just shows how to configure one
# part of it to report the size of things.
context = tff.framework.ExecutionContext(executor_fn=sizing_factory)
tff.framework.set_default_context(context)
return sizing_factory
def train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
"""Trains the federated averaging process and output metrics."""
# Create a environment to get communication cost.
environment = set_sizing_environment()
# Initialize the Federated Averaging algorithm to get the initial server state.
state = federated_averaging_process.initialize()
with summary_writer.as_default():
for round_num in range(num_rounds):
# Sample the clients parcitipated in this round.
sampled_clients = np.random.choice(
emnist_train.client_ids,
size=num_clients_per_round,
replace=False)
# Create a list of `tf.Dataset` instances from the data of sampled clients.
sampled_train_data = [
emnist_train.create_tf_dataset_for_client(client)
for client in sampled_clients
]
# Round one round of the algorithm based on the server state and client data
# and output the new state and metrics.
state, metrics = federated_averaging_process.next(state, sampled_train_data)
# For more about size_info, please see https://www.tensorflow.org/federated/api_docs/python/tff/framework/SizeInfo
size_info = environment.get_size_info()
broadcasted_bits = size_info.broadcast_bits[-1]
aggregated_bits = size_info.aggregate_bits[-1]
print('round {:2d}, metrics={}, broadcasted_bits={}, aggregated_bits={}'.format(round_num, metrics, format_size(broadcasted_bits), format_size(aggregated_bits)))
# Add metrics to Tensorboard.
for name, value in metrics['train'].items():
tf.summary.scalar(name, value, step=round_num)
# Add broadcasted and aggregated data size to Tensorboard.
tf.summary.scalar('cumulative_broadcasted_bits', broadcasted_bits, step=round_num)
tf.summary.scalar('cumulative_aggregated_bits', aggregated_bits, step=round_num)
summary_writer.flush()
# Clean the log directory to avoid conflicts.
try:
tf.io.gfile.rmtree('/tmp/logs/scalars')
except tf.errors.OpError as e:
pass # Path doesn't exist
# Set up the log directory and writer for Tensorboard.
logdir = "/tmp/logs/scalars/original/"
summary_writer = tf.summary.create_file_writer(logdir)
train(federated_averaging_process=federated_averaging, num_rounds=10,
num_clients_per_round=10, summary_writer=summary_writer)
round 0, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.07383774), ('loss', 2.3276227)])), ('stat', OrderedDict([('num_examples', 1097)]))]), broadcasted_bits=507.62Mibit, aggregated_bits=507.62Mibit round 1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.099585064), ('loss', 2.3152695)])), ('stat', OrderedDict([('num_examples', 964)]))]), broadcasted_bits=1015.24Mibit, aggregated_bits=1015.24Mibit round 2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09760766), ('loss', 2.3077576)])), ('stat', OrderedDict([('num_examples', 1045)]))]), broadcasted_bits=1.49Gibit, aggregated_bits=1.49Gibit round 3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.0963035), ('loss', 2.3066626)])), ('stat', OrderedDict([('num_examples', 1028)]))]), broadcasted_bits=1.98Gibit, aggregated_bits=1.98Gibit round 4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.10694184), ('loss', 2.3033001)])), ('stat', OrderedDict([('num_examples', 1066)]))]), broadcasted_bits=2.48Gibit, aggregated_bits=2.48Gibit round 5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.1185567), ('loss', 2.2999184)])), ('stat', OrderedDict([('num_examples', 970)]))]), broadcasted_bits=2.97Gibit, aggregated_bits=2.97Gibit round 6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.11751663), ('loss', 2.296883)])), ('stat', OrderedDict([('num_examples', 902)]))]), broadcasted_bits=3.47Gibit, aggregated_bits=3.47Gibit round 7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13063477), ('loss', 2.2990246)])), ('stat', OrderedDict([('num_examples', 1087)]))]), broadcasted_bits=3.97Gibit, aggregated_bits=3.97Gibit round 8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12742382), ('loss', 2.2971866)])), ('stat', OrderedDict([('num_examples', 1083)]))]), broadcasted_bits=4.46Gibit, aggregated_bits=4.46Gibit round 9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13555992), ('loss', 2.2934425)])), ('stat', OrderedDict([('num_examples', 1018)]))]), broadcasted_bits=4.96Gibit, aggregated_bits=4.96Gibit
Mulai TensorBoard dengan direktori root log yang ditentukan di atas untuk menampilkan metrik pelatihan. Diperlukan beberapa detik untuk memuat data. Kecuali untuk Loss and Accuracy, kami juga menampilkan jumlah data yang disiarkan dan dikumpulkan. Data yang disiarkan mengacu pada tensor yang didorong server ke setiap klien sementara data agregat mengacu pada tensor yang dikembalikan oleh setiap klien ke server.
%tensorboard --logdir /tmp/logs/scalars/ --port=0
Launching TensorBoard... Reusing TensorBoard on port 34445 (pid 579503), started 1:53:14 ago. (Use '!kill 579503' to kill it.) <IPython.core.display.Javascript at 0x7f9135ef1630>
Bangun siaran khusus dan fungsi agregat
Sekarang mari kita menerapkan fungsi untuk menggunakan algoritma kompresi lossy data disiarkan dan data dikumpulkan dengan menggunakan tensor_encoding API.
Pertama, kita mendefinisikan dua fungsi:
-
broadcast_encoder_fn
yang menciptakan sebuah instance dari te.core.SimpleEncoder untuk tensor encode atau variabel dalam server untuk komunikasi client (data Broadcast). -
mean_encoder_fn
yang menciptakan sebuah instance dari te.core.GatherEncoder untuk tensor encode atau variabel di klien ke server communicaiton (data Agregasi).
Penting untuk dicatat bahwa kami tidak menerapkan metode kompresi ke seluruh model sekaligus. Sebagai gantinya, kami memutuskan bagaimana (dan apakah) mengompresi setiap variabel model secara independen. Alasannya adalah bahwa secara umum, variabel kecil seperti bias lebih sensitif terhadap ketidakakuratan, dan karena relatif kecil, potensi penghematan komunikasi juga relatif kecil. Oleh karena itu kami tidak mengompresi variabel kecil secara default. Dalam contoh ini, kami menerapkan kuantisasi seragam ke 8 bit (256 ember) untuk setiap variabel dengan lebih dari 10.000 elemen, dan hanya menerapkan identitas ke variabel lain.
def broadcast_encoder_fn(value):
"""Function for building encoded broadcast."""
spec = tf.TensorSpec(value.shape, value.dtype)
if value.shape.num_elements() > 10000:
return te.encoders.as_simple_encoder(
te.encoders.uniform_quantization(bits=8), spec)
else:
return te.encoders.as_simple_encoder(te.encoders.identity(), spec)
def mean_encoder_fn(tensor_spec):
"""Function for building a GatherEncoder."""
spec = tf.TensorSpec(tensor_spec.shape, tensor_spec.dtype)
if tensor_spec.shape.num_elements() > 10000:
return te.encoders.as_gather_encoder(
te.encoders.uniform_quantization(bits=8), spec)
else:
return te.encoders.as_gather_encoder(te.encoders.identity(), spec)
TFF menyediakan API untuk mengkonversi fungsi encoder ke dalam format yang tff.learning.build_federated_averaging_process
API dapat mengkonsumsi. Dengan menggunakan tff.learning.framework.build_encoded_broadcast_from_model
dan tff.aggregators.MeanFactory
, kita bisa membuat dua benda yang bisa masuk ke broadcast_process
dan model_update_aggregation_factory
agruments dari tff.learning.build_federated_averaging_process
untuk membuat Federasi Averaging algoritma dengan algoritma kompresi lossy.
encoded_broadcast_process = (
tff.learning.framework.build_encoded_broadcast_process_from_model(
tff_model_fn, broadcast_encoder_fn))
mean_factory = tff.aggregators.MeanFactory(
tff.aggregators.EncodedSumFactory(mean_encoder_fn), # numerator
tff.aggregators.EncodedSumFactory(mean_encoder_fn), # denominator
)
federated_averaging_with_compression = tff.learning.build_federated_averaging_process(
tff_model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),
broadcast_process=encoded_broadcast_process,
model_update_aggregation_factory=mean_factory)
Melatih model lagi
Sekarang mari kita jalankan algoritma Federated Averaging yang baru.
logdir_for_compression = "/tmp/logs/scalars/compression/"
summary_writer_for_compression = tf.summary.create_file_writer(
logdir_for_compression)
train(federated_averaging_process=federated_averaging_with_compression,
num_rounds=10,
num_clients_per_round=10,
summary_writer=summary_writer_for_compression)
round 0, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.093), ('loss', 2.3194966)])), ('stat', OrderedDict([('num_examples', 1000)]))]), broadcasted_bits=146.46Mibit, aggregated_bits=146.46Mibit round 1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.10432034), ('loss', 2.3079953)])), ('stat', OrderedDict([('num_examples', 949)]))]), broadcasted_bits=292.92Mibit, aggregated_bits=292.93Mibit round 2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.07886754), ('loss', 2.3101337)])), ('stat', OrderedDict([('num_examples', 989)]))]), broadcasted_bits=439.38Mibit, aggregated_bits=439.39Mibit round 3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09774436), ('loss', 2.305069)])), ('stat', OrderedDict([('num_examples', 1064)]))]), broadcasted_bits=585.84Mibit, aggregated_bits=585.85Mibit round 4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09404097), ('loss', 2.302943)])), ('stat', OrderedDict([('num_examples', 1074)]))]), broadcasted_bits=732.30Mibit, aggregated_bits=732.32Mibit round 5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09), ('loss', 2.304385)])), ('stat', OrderedDict([('num_examples', 1000)]))]), broadcasted_bits=878.77Mibit, aggregated_bits=878.78Mibit round 6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.14368932), ('loss', 2.2973824)])), ('stat', OrderedDict([('num_examples', 1030)]))]), broadcasted_bits=1.00Gibit, aggregated_bits=1.00Gibit round 7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12140871), ('loss', 2.2993405)])), ('stat', OrderedDict([('num_examples', 1079)]))]), broadcasted_bits=1.14Gibit, aggregated_bits=1.14Gibit round 8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13600783), ('loss', 2.2953267)])), ('stat', OrderedDict([('num_examples', 1022)]))]), broadcasted_bits=1.29Gibit, aggregated_bits=1.29Gibit round 9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13844621), ('loss', 2.295768)])), ('stat', OrderedDict([('num_examples', 1004)]))]), broadcasted_bits=1.43Gibit, aggregated_bits=1.43Gibit
Mulai TensorBoard lagi untuk membandingkan metrik pelatihan antara dua proses.
Seperti yang Anda lihat di Tensorboard, ada penurunan yang signifikan antara orginial
dan compression
kurva di broadcasted_bits
dan aggregated_bits
plot sementara di loss
dan sparse_categorical_accuracy
plot dua kurva yang cukup mirip.
Sebagai kesimpulan, kami menerapkan algoritma kompresi yang dapat mencapai kinerja yang sama seperti algoritma Federated Averaging asli sementara biaya komunikasi berkurang secara signifikan.
%tensorboard --logdir /tmp/logs/scalars/ --port=0
Launching TensorBoard... Reusing TensorBoard on port 34445 (pid 579503), started 1:54:12 ago. (Use '!kill 579503' to kill it.) <IPython.core.display.Javascript at 0x7f9140eb5ef0>
Latihan
Untuk menerapkan algoritme kompresi khusus dan menerapkannya ke loop pelatihan, Anda dapat:
- Mengimplementasikan algoritma kompresi baru sebagai subclass dari
EncodingStageInterface
atau varian yang lebih umum,AdaptiveEncodingStageInterface
mengikuti contoh ini . - Membangun baru Anda
Encoder
dan mengkhususkan untuk Model siaran atau model update averaging . - Gunakan benda-benda untuk membangun seluruh perhitungan pelatihan .
Pertanyaan penelitian terbuka yang berpotensi berharga meliputi: kuantisasi tidak seragam, kompresi lossless seperti pengkodean huffman, dan mekanisme untuk mengadaptasi kompresi berdasarkan informasi dari putaran pelatihan sebelumnya.
Bahan bacaan yang direkomendasikan: