ดูบน TensorFlow.org | ทำงานใน Google Colab | ดูแหล่งที่มาบน GitHub | ดาวน์โหลดโน๊ตบุ๊ค |
ในการกวดวิชานี้เราจะใช้ EMNIST ชุดข้อมูลที่แสดงให้เห็นถึงวิธีการเปิดใช้อัลกอริทึมการบีบอัด lossy เพื่อลดค่าใช้จ่ายในการติดต่อสื่อสารในขั้นตอนวิธีการ Averaging สหพันธ์ใช้ tff.learning.build_federated_averaging_process
API และ tensor_encoding API สำหรับรายละเอียดเพิ่มเติมเกี่ยวกับขั้นตอนวิธีการ Averaging สหพันธ์ดูกระดาษ การเรียนรู้การสื่อสารที่มีประสิทธิภาพของเครือข่ายการกระจายอำนาจลึกจากข้อมูล
ก่อนที่เราจะเริ่มต้น
ก่อนที่เราจะเริ่ม โปรดเรียกใช้สิ่งต่อไปนี้เพื่อให้แน่ใจว่าสภาพแวดล้อมของคุณได้รับการตั้งค่าอย่างถูกต้อง หากคุณไม่เห็นคำทักทายโปรดดูที่ การติดตั้ง คู่มือสำหรับคำแนะนำ
!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade tensorflow-model-optimization
!pip install --quiet --upgrade nest-asyncio
import nest_asyncio
nest_asyncio.apply()
%load_ext tensorboard
import functools
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
from tensorflow_model_optimization.python.core.internal import tensor_encoding as te
ตรวจสอบว่า TFF ทำงานหรือไม่
@tff.federated_computation
def hello_world():
return 'Hello, World!'
hello_world()
b'Hello, World!'
การเตรียมข้อมูลเข้า
ในส่วนนี้ เราจะโหลดและประมวลผลชุดข้อมูล EMNIST ที่รวมอยู่ใน TFF ล่วงหน้า กรุณาตรวจสอบ สหพันธ์เรียนรู้สำหรับภาพการจำแนกประเภท กวดวิชาสำหรับรายละเอียดเพิ่มเติมเกี่ยวกับชุด EMNIST
# This value only applies to EMNIST dataset, consider choosing appropriate
# values if switching to other datasets.
MAX_CLIENT_DATASET_SIZE = 418
CLIENT_EPOCHS_PER_ROUND = 1
CLIENT_BATCH_SIZE = 20
TEST_BATCH_SIZE = 500
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(
only_digits=True)
def reshape_emnist_element(element):
return (tf.expand_dims(element['pixels'], axis=-1), element['label'])
def preprocess_train_dataset(dataset):
"""Preprocessing function for the EMNIST training dataset."""
return (dataset
# Shuffle according to the largest client dataset
.shuffle(buffer_size=MAX_CLIENT_DATASET_SIZE)
# Repeat to do multiple local epochs
.repeat(CLIENT_EPOCHS_PER_ROUND)
# Batch to a fixed client batch size
.batch(CLIENT_BATCH_SIZE, drop_remainder=False)
# Preprocessing step
.map(reshape_emnist_element))
emnist_train = emnist_train.preprocess(preprocess_train_dataset)
การกำหนดแบบจำลอง
ที่นี่เรากำหนดรูปแบบ keras บนพื้นฐานของ orginial FedAvg ซีเอ็นเอ็นและแล้วห่อรุ่น keras ในตัวอย่างของ tff.learning.Model เพื่อที่จะสามารถนำมาบริโภคโดยฉิบหาย
โปรดทราบว่าเราจะต้องฟังก์ชั่นซึ่งเป็นผู้ผลิตแบบแทนที่จะรูปแบบโดยตรง นอกจากนี้ฟังก์ชั่นไม่สามารถเพียงแค่จับรุ่นก่อนสร้างขึ้นนั้นจะต้องสร้างแบบจำลองในบริบทที่ว่ามันจะเรียกว่า เหตุผลก็คือ TFF ได้รับการออกแบบมาให้ใช้กับอุปกรณ์ และจำเป็นต้องควบคุมเมื่อทรัพยากรถูกสร้างขึ้นเพื่อให้สามารถดักจับและบรรจุหีบห่อได้
def create_original_fedavg_cnn_model(only_digits=True):
"""The CNN model used in https://arxiv.org/abs/1602.05629."""
data_format = 'channels_last'
max_pool = functools.partial(
tf.keras.layers.MaxPooling2D,
pool_size=(2, 2),
padding='same',
data_format=data_format)
conv2d = functools.partial(
tf.keras.layers.Conv2D,
kernel_size=5,
padding='same',
data_format=data_format,
activation=tf.nn.relu)
model = tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
conv2d(filters=32),
max_pool(),
conv2d(filters=64),
max_pool(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(512, activation=tf.nn.relu),
tf.keras.layers.Dense(10 if only_digits else 62),
tf.keras.layers.Softmax(),
])
return model
# Gets the type information of the input data. TFF is a strongly typed
# functional programming framework, and needs type information about inputs to
# the model.
input_spec = emnist_train.create_tf_dataset_for_client(
emnist_train.client_ids[0]).element_spec
def tff_model_fn():
keras_model = create_original_fedavg_cnn_model()
return tff.learning.from_keras_model(
keras_model=keras_model,
input_spec=input_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
การฝึกอบรมแบบจำลองและการส่งออกตัวชี้วัดการฝึกอบรม
ตอนนี้ เราพร้อมที่จะสร้างอัลกอริธึม Federated Averaging และฝึกโมเดลที่กำหนดไว้บนชุดข้อมูล EMNIST
ครั้งแรกที่เราจำเป็นต้องสร้างอัลกอริทึม Averaging สหพันธ์ใช้ tff.learning.build_federated_averaging_process API
federated_averaging = tff.learning.build_federated_averaging_process(
model_fn=tff_model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))
ตอนนี้ มาเรียกใช้อัลกอริทึม Federated Averaging กัน การดำเนินการของอัลกอริธึม Federated Learning จากมุมมองของ TFF มีลักษณะดังนี้:
- เริ่มต้นอัลกอริทึมและรับสถานะเซิร์ฟเวอร์เริ่มต้น สถานะเซิร์ฟเวอร์มีข้อมูลที่จำเป็นในการดำเนินการอัลกอริทึม โปรดจำไว้ว่า เนื่องจาก TFF ใช้งานได้ สถานะนี้รวมทั้งสถานะของตัวเพิ่มประสิทธิภาพใดๆ ที่อัลกอริธึมใช้ (เช่น เงื่อนไขโมเมนตัม) เช่นเดียวกับพารามิเตอร์ของโมเดลด้วยตัวมันเอง -- สิ่งเหล่านี้จะถูกส่งผ่านเป็นอาร์กิวเมนต์และส่งคืนเป็นผลจากการคำนวณ TFF
- ดำเนินการอัลกอริทึมทีละรอบ ในแต่ละรอบ สถานะเซิร์ฟเวอร์ใหม่จะถูกส่งกลับอันเป็นผลมาจากการที่ไคลเอ็นต์แต่ละรายฝึกโมเดลเกี่ยวกับข้อมูลของตน โดยปกติในรอบเดียว:
- เซิร์ฟเวอร์เผยแพร่แบบจำลองให้กับลูกค้าที่เข้าร่วมทั้งหมด
- ลูกค้าแต่ละรายทำงานตามแบบจำลองและข้อมูลของตัวเอง
- เซิร์ฟเวอร์รวมโมเดลทั้งหมดเพื่อสร้างสถานะเซิร์ฟเวอร์ซึ่งมีโมเดลใหม่
สำหรับรายละเอียดเพิ่มเติมโปรดดู ที่กำหนดเองสหพันธ์อัลกอริทึม, ส่วนที่ 2: การใช้สหพันธ์ Averaging กวดวิชา
เมตริกการฝึกอบรมจะถูกเขียนลงในไดเร็กทอรี Tensorboard เพื่อแสดงหลังการฝึกอบรม
โหลดฟังก์ชั่นยูทิลิตี้
def format_size(size):
"""A helper function for creating a human-readable size."""
size = float(size)
for unit in ['bit','Kibit','Mibit','Gibit']:
if size < 1024.0:
return "{size:3.2f}{unit}".format(size=size, unit=unit)
size /= 1024.0
return "{size:.2f}{unit}".format(size=size, unit='TiB')
def set_sizing_environment():
"""Creates an environment that contains sizing information."""
# Creates a sizing executor factory to output communication cost
# after the training finishes. Note that sizing executor only provides an
# estimate (not exact) of communication cost, and doesn't capture cases like
# compression of over-the-wire representations. However, it's perfect for
# demonstrating the effect of compression in this tutorial.
sizing_factory = tff.framework.sizing_executor_factory()
# TFF has a modular runtime you can configure yourself for various
# environments and purposes, and this example just shows how to configure one
# part of it to report the size of things.
context = tff.framework.ExecutionContext(executor_fn=sizing_factory)
tff.framework.set_default_context(context)
return sizing_factory
def train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
"""Trains the federated averaging process and output metrics."""
# Create a environment to get communication cost.
environment = set_sizing_environment()
# Initialize the Federated Averaging algorithm to get the initial server state.
state = federated_averaging_process.initialize()
with summary_writer.as_default():
for round_num in range(num_rounds):
# Sample the clients parcitipated in this round.
sampled_clients = np.random.choice(
emnist_train.client_ids,
size=num_clients_per_round,
replace=False)
# Create a list of `tf.Dataset` instances from the data of sampled clients.
sampled_train_data = [
emnist_train.create_tf_dataset_for_client(client)
for client in sampled_clients
]
# Round one round of the algorithm based on the server state and client data
# and output the new state and metrics.
state, metrics = federated_averaging_process.next(state, sampled_train_data)
# For more about size_info, please see https://www.tensorflow.org/federated/api_docs/python/tff/framework/SizeInfo
size_info = environment.get_size_info()
broadcasted_bits = size_info.broadcast_bits[-1]
aggregated_bits = size_info.aggregate_bits[-1]
print('round {:2d}, metrics={}, broadcasted_bits={}, aggregated_bits={}'.format(round_num, metrics, format_size(broadcasted_bits), format_size(aggregated_bits)))
# Add metrics to Tensorboard.
for name, value in metrics['train'].items():
tf.summary.scalar(name, value, step=round_num)
# Add broadcasted and aggregated data size to Tensorboard.
tf.summary.scalar('cumulative_broadcasted_bits', broadcasted_bits, step=round_num)
tf.summary.scalar('cumulative_aggregated_bits', aggregated_bits, step=round_num)
summary_writer.flush()
# Clean the log directory to avoid conflicts.
try:
tf.io.gfile.rmtree('/tmp/logs/scalars')
except tf.errors.OpError as e:
pass # Path doesn't exist
# Set up the log directory and writer for Tensorboard.
logdir = "/tmp/logs/scalars/original/"
summary_writer = tf.summary.create_file_writer(logdir)
train(federated_averaging_process=federated_averaging, num_rounds=10,
num_clients_per_round=10, summary_writer=summary_writer)
round 0, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.07383774), ('loss', 2.3276227)])), ('stat', OrderedDict([('num_examples', 1097)]))]), broadcasted_bits=507.62Mibit, aggregated_bits=507.62Mibit round 1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.099585064), ('loss', 2.3152695)])), ('stat', OrderedDict([('num_examples', 964)]))]), broadcasted_bits=1015.24Mibit, aggregated_bits=1015.24Mibit round 2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09760766), ('loss', 2.3077576)])), ('stat', OrderedDict([('num_examples', 1045)]))]), broadcasted_bits=1.49Gibit, aggregated_bits=1.49Gibit round 3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.0963035), ('loss', 2.3066626)])), ('stat', OrderedDict([('num_examples', 1028)]))]), broadcasted_bits=1.98Gibit, aggregated_bits=1.98Gibit round 4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.10694184), ('loss', 2.3033001)])), ('stat', OrderedDict([('num_examples', 1066)]))]), broadcasted_bits=2.48Gibit, aggregated_bits=2.48Gibit round 5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.1185567), ('loss', 2.2999184)])), ('stat', OrderedDict([('num_examples', 970)]))]), broadcasted_bits=2.97Gibit, aggregated_bits=2.97Gibit round 6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.11751663), ('loss', 2.296883)])), ('stat', OrderedDict([('num_examples', 902)]))]), broadcasted_bits=3.47Gibit, aggregated_bits=3.47Gibit round 7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13063477), ('loss', 2.2990246)])), ('stat', OrderedDict([('num_examples', 1087)]))]), broadcasted_bits=3.97Gibit, aggregated_bits=3.97Gibit round 8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12742382), ('loss', 2.2971866)])), ('stat', OrderedDict([('num_examples', 1083)]))]), broadcasted_bits=4.46Gibit, aggregated_bits=4.46Gibit round 9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13555992), ('loss', 2.2934425)])), ('stat', OrderedDict([('num_examples', 1018)]))]), broadcasted_bits=4.96Gibit, aggregated_bits=4.96Gibit
เริ่ม TensorBoard ด้วยไดเร็กทอรีบันทึกของรูทที่ระบุด้านบนเพื่อแสดงเมตริกการฝึกอบรม อาจใช้เวลาสองสามวินาทีในการโหลดข้อมูล ยกเว้นการสูญเสียและความแม่นยำ เรายังส่งออกจำนวนข้อมูลที่ออกอากาศและรวม ข้อมูลที่ออกอากาศหมายถึงเทนเซอร์ที่เซิร์ฟเวอร์ส่งไปยังแต่ละไคลเอ็นต์ ในขณะที่ข้อมูลที่รวมหมายถึงเมตริกซ์ที่ไคลเอ็นต์แต่ละรายส่งคืนไปยังเซิร์ฟเวอร์
%tensorboard --logdir /tmp/logs/scalars/ --port=0
Launching TensorBoard... Reusing TensorBoard on port 34445 (pid 579503), started 1:53:14 ago. (Use '!kill 579503' to kill it.) <IPython.core.display.Javascript at 0x7f9135ef1630>
สร้างฟังก์ชันการออกอากาศและการรวมแบบกำหนดเอง
ตอนนี้ขอใช้ฟังก์ชั่นการใช้อัลกอริทึมการบีบอัด lossy กับข้อมูลการแพร่ภาพและข้อมูลที่รวบรวมโดยใช้ tensor_encoding API
ขั้นแรก เรากำหนดสองฟังก์ชัน:
-
broadcast_encoder_fn
ซึ่งจะสร้างตัวอย่างของ te.core.SimpleEncoder เพื่อเทนเซอร์เข้ารหัสหรือตัวแปรในเซิร์ฟเวอร์สื่อสารของลูกค้า (ข้อมูล Broadcast) -
mean_encoder_fn
ซึ่งจะสร้างตัวอย่างของ te.core.GatherEncoder เพื่อเทนเซอร์เข้ารหัสหรือตัวแปรในลูกค้า communicaiton เซิร์ฟเวอร์ (ข้อมูล Aggregation)
สิ่งสำคัญคือต้องทราบว่าเราไม่ได้ใช้วิธีการบีบอัดกับโมเดลทั้งหมดในคราวเดียว แต่เราตัดสินใจว่าจะบีบอัดตัวแปรแต่ละตัวของโมเดลอย่างไร (และหรือไม่) อย่างอิสระ เหตุผลก็คือว่าโดยทั่วไป ตัวแปรขนาดเล็ก เช่น ความเอนเอียง มีความอ่อนไหวต่อความไม่ถูกต้องมากกว่า และเมื่อมีขนาดค่อนข้างเล็ก ความสามารถในการประหยัดการสื่อสารก็ค่อนข้างน้อยเช่นกัน ดังนั้นเราจึงไม่บีบอัดตัวแปรขนาดเล็กโดยค่าเริ่มต้น ในตัวอย่างนี้ เราใช้การหาปริมาณแบบสม่ำเสมอกับ 8 บิต (256 บัคเก็ต) กับทุกตัวแปรที่มีองค์ประกอบมากกว่า 10,000 รายการ และใช้ข้อมูลเฉพาะตัวกับตัวแปรอื่นๆ เท่านั้น
def broadcast_encoder_fn(value):
"""Function for building encoded broadcast."""
spec = tf.TensorSpec(value.shape, value.dtype)
if value.shape.num_elements() > 10000:
return te.encoders.as_simple_encoder(
te.encoders.uniform_quantization(bits=8), spec)
else:
return te.encoders.as_simple_encoder(te.encoders.identity(), spec)
def mean_encoder_fn(tensor_spec):
"""Function for building a GatherEncoder."""
spec = tf.TensorSpec(tensor_spec.shape, tensor_spec.dtype)
if tensor_spec.shape.num_elements() > 10000:
return te.encoders.as_gather_encoder(
te.encoders.uniform_quantization(bits=8), spec)
else:
return te.encoders.as_gather_encoder(te.encoders.identity(), spec)
ฉิบหายให้ API เพื่อแปลงฟังก์ชั่นการเข้ารหัสในรูปแบบที่ tff.learning.build_federated_averaging_process
API สามารถใช้ โดยใช้ tff.learning.framework.build_encoded_broadcast_from_model
และ tff.aggregators.MeanFactory
เราสามารถสร้างวัตถุสองที่สามารถผ่านเข้าสู่ broadcast_process
และ model_update_aggregation_factory
agruments ของ tff.learning.build_federated_averaging_process
การสร้างอัลกอริทึมสหพันธ์เฉลี่ยด้วยวิธีการบีบอัด lossy
encoded_broadcast_process = (
tff.learning.framework.build_encoded_broadcast_process_from_model(
tff_model_fn, broadcast_encoder_fn))
mean_factory = tff.aggregators.MeanFactory(
tff.aggregators.EncodedSumFactory(mean_encoder_fn), # numerator
tff.aggregators.EncodedSumFactory(mean_encoder_fn), # denominator
)
federated_averaging_with_compression = tff.learning.build_federated_averaging_process(
tff_model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),
broadcast_process=encoded_broadcast_process,
model_update_aggregation_factory=mean_factory)
ฝึกโมเดลอีกแล้ว
ตอนนี้ มาเรียกใช้อัลกอริทึม Federated Averaging ใหม่กัน
logdir_for_compression = "/tmp/logs/scalars/compression/"
summary_writer_for_compression = tf.summary.create_file_writer(
logdir_for_compression)
train(federated_averaging_process=federated_averaging_with_compression,
num_rounds=10,
num_clients_per_round=10,
summary_writer=summary_writer_for_compression)
round 0, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.093), ('loss', 2.3194966)])), ('stat', OrderedDict([('num_examples', 1000)]))]), broadcasted_bits=146.46Mibit, aggregated_bits=146.46Mibit round 1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.10432034), ('loss', 2.3079953)])), ('stat', OrderedDict([('num_examples', 949)]))]), broadcasted_bits=292.92Mibit, aggregated_bits=292.93Mibit round 2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.07886754), ('loss', 2.3101337)])), ('stat', OrderedDict([('num_examples', 989)]))]), broadcasted_bits=439.38Mibit, aggregated_bits=439.39Mibit round 3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09774436), ('loss', 2.305069)])), ('stat', OrderedDict([('num_examples', 1064)]))]), broadcasted_bits=585.84Mibit, aggregated_bits=585.85Mibit round 4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09404097), ('loss', 2.302943)])), ('stat', OrderedDict([('num_examples', 1074)]))]), broadcasted_bits=732.30Mibit, aggregated_bits=732.32Mibit round 5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09), ('loss', 2.304385)])), ('stat', OrderedDict([('num_examples', 1000)]))]), broadcasted_bits=878.77Mibit, aggregated_bits=878.78Mibit round 6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.14368932), ('loss', 2.2973824)])), ('stat', OrderedDict([('num_examples', 1030)]))]), broadcasted_bits=1.00Gibit, aggregated_bits=1.00Gibit round 7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12140871), ('loss', 2.2993405)])), ('stat', OrderedDict([('num_examples', 1079)]))]), broadcasted_bits=1.14Gibit, aggregated_bits=1.14Gibit round 8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13600783), ('loss', 2.2953267)])), ('stat', OrderedDict([('num_examples', 1022)]))]), broadcasted_bits=1.29Gibit, aggregated_bits=1.29Gibit round 9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13844621), ('loss', 2.295768)])), ('stat', OrderedDict([('num_examples', 1004)]))]), broadcasted_bits=1.43Gibit, aggregated_bits=1.43Gibit
เริ่ม TensorBoard อีกครั้งเพื่อเปรียบเทียบเมตริกการฝึกระหว่างการวิ่งสองครั้ง
ที่คุณสามารถดูใน Tensorboard, มีการลดลงอย่างมีนัยสำคัญระหว่าง orginial
และ compression
โค้งใน broadcasted_bits
และ aggregated_bits
แปลงในขณะที่ loss
และ sparse_categorical_accuracy
พล็อตสองโค้งคล้ายสวย
โดยสรุป เราใช้อัลกอริธึมการบีบอัดที่สามารถบรรลุประสิทธิภาพที่คล้ายคลึงกันกับอัลกอริธึม Federated Averaging ดั้งเดิม ในขณะที่ต้นทุนการสื่อสารลดลงอย่างเห็นได้ชัด
%tensorboard --logdir /tmp/logs/scalars/ --port=0
Launching TensorBoard... Reusing TensorBoard on port 34445 (pid 579503), started 1:54:12 ago. (Use '!kill 579503' to kill it.) <IPython.core.display.Javascript at 0x7f9140eb5ef0>
การออกกำลังกาย
ในการปรับใช้อัลกอริธึมการบีบอัดแบบกำหนดเองและนำไปใช้กับลูปการฝึก คุณสามารถ:
- ใช้วิธีการบีบอัดใหม่เป็น subclass ของ
EncodingStageInterface
หรือตัวแปรทั่วไปมากขึ้นของAdaptiveEncodingStageInterface
ต่อไปนี้ ตัวอย่างนี้ - สร้างใหม่ของคุณ
Encoder
และเชี่ยวชาญมันสำหรับ รูปแบบการออกอากาศ หรือ รูปแบบการปรับปรุงค่าเฉลี่ย - ใช้วัตถุเหล่านั้นที่จะสร้างทั้ง การคำนวณการฝึกอบรม
คำถามการวิจัยแบบเปิดที่อาจมีประโยชน์ ได้แก่ การหาปริมาณที่ไม่สม่ำเสมอ การบีบอัดแบบไม่สูญเสียข้อมูล เช่น การเข้ารหัส Huffman และกลไกในการปรับการบีบอัดตามข้อมูลจากรอบการฝึกครั้งก่อน
สื่อการอ่านที่แนะนำ: