Посмотреть на TensorFlow.org | Запускаем в Google Colab | Посмотреть исходный код на GitHub | Скачать блокнот |
Понятие набора данных с ключами клиентов (например, пользователей) важно для объединенных вычислений, смоделированных в TFF. ПТФ обеспечивает интерфейс tff.simulation.datasets.ClientData
абстрагироваться над этой концепцией, и наборы данных которые TFF хостов ( StackOverflow , Shakespeare , emnist , cifar100 и gldv2 ) все реализуют этот интерфейс.
Если вы работаете на федеративном обучение с собственным набором данных, ПТФ настоятельно рекомендую либо реализовать ClientData
интерфейс или использовать один из вспомогательных функций , ПТФ, чтобы генерировать ClientData
, который представляет ваши данные на диске, например , tff.simulation.datasets.ClientData.from_clients_and_fn
.
В большинстве примеров ПТФ в впритык начать с ClientData
объекты, реализующие ClientData
интерфейс пользовательского набора данных будет сделать проще spelunk через существующий код , написанный с TFF. Кроме того, tf.data.Datasets
, которые ClientData
конструкции могут быть непосредственно итерации с получением структуры numpy
массивов, поэтому ClientData
объектов может быть использован с любым Python , на основе структуры ML , прежде чем перейти к TFF.
Есть несколько шаблонов, с помощью которых вы можете облегчить себе жизнь, если собираетесь масштабировать моделирование на множество машин или развертывать их. Ниже мы будем ходить через несколько способов , которые мы можем использовать ClientData
и TFF , чтобы сделать нашу мелкосерийное итерация к крупномасштабной экспериментирование-к производству опыт развертывания как можно более гладким.
Какой шаблон мне следует использовать для передачи ClientData в TFF?
Мы рассмотрим два обычаи ПТФ в ClientData
в глубину; Если вы подходите к одной из двух категорий, представленных ниже, вы явно предпочтете одну из них. В противном случае вам может потребоваться более подробное понимание плюсов и минусов каждого из них, чтобы сделать более тонкий выбор.
Я хочу выполнить итерацию как можно быстрее на локальной машине; Мне не нужно иметь возможность легко пользоваться преимуществами распределенной среды выполнения TFF.
- Вы хотите передать
tf.data.Datasets
, чтобы TFF непосредственно. - Это позволяет программировать императивно с
tf.data.Dataset
объектов, и обрабатывать их как угодно. - Он обеспечивает большую гибкость, чем вариант ниже; передача логики клиентам требует, чтобы эта логика была сериализуемой.
- Вы хотите передать
Я хочу запустить свои федеративные вычисления в удаленной среде выполнения TFF или планирую сделать это в ближайшее время.
- В этом случае вы хотите отобразить создание набора данных и предварительную обработку для клиентов.
- Это приводит вас прохождение просто список
client_ids
непосредственно к федеративному вычислений. - Передача клиентам создания набора данных и предварительной обработки позволяет избежать узких мест в сериализации и значительно повысить производительность при работе с сотнями и тысячами клиентов.
Настроить среду с открытым исходным кодом
# tensorflow_federated_nightly also bring in tf_nightly, which
# can causes a duplicate tensorboard install, leading to errors.
!pip uninstall --yes tensorboard tb-nightly
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
Импортировать пакеты
import collections
import time
import tensorflow as tf
import tensorflow_federated as tff
Управление объектом ClientData
Давайте начнем с загрузкой и изучением ПТФА в EMNIST ClientData
:
client_data, _ = tff.simulation.datasets.emnist.load_data()
Downloading emnist_all.sqlite.lzma: 100%|██████████| 170507172/170507172 [00:19<00:00, 8831921.67it/s] 2021-10-01 11:17:58.718735: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Осмотрев первый набор данных может сказать нам , какой тип примеров в ClientData
.
first_client_id = client_data.client_ids[0]
first_client_dataset = client_data.create_tf_dataset_for_client(
first_client_id)
print(first_client_dataset.element_spec)
# This information is also available as a `ClientData` property:
assert client_data.element_type_structure == first_client_dataset.element_spec
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])
Обратите внимание , что выходы набора данных collections.OrderedDict
объектов , которые имеют pixels
и label
ключей, где пиксели есть тензор с формой [28, 28]
. Предположим , мы хотим , чтобы сгладить наши материалы к форме [784]
. Одним из возможных способов , мы можем сделать это было бы применить функцию предварительной обработки для нашего ClientData
объекта.
def preprocess_dataset(dataset):
"""Create batches of 5 examples, and limit to 3 batches."""
def map_fn(input):
return collections.OrderedDict(
x=tf.reshape(input['pixels'], shape=(-1, 784)),
y=tf.cast(tf.reshape(input['label'], shape=(-1, 1)), tf.int64),
)
return dataset.batch(5).map(
map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE).take(5)
preprocessed_client_data = client_data.preprocess(preprocess_dataset)
# Notice that we have both reshaped and renamed the elements of the ordered dict.
first_client_dataset = preprocessed_client_data.create_tf_dataset_for_client(
first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])
Мы можем захотеть дополнительно выполнить более сложную (и, возможно, с сохранением состояния) предварительную обработку, например перемешивание.
def preprocess_and_shuffle(dataset):
"""Applies `preprocess_dataset` above and shuffles the result."""
preprocessed = preprocess_dataset(dataset)
return preprocessed.shuffle(buffer_size=5)
preprocessed_and_shuffled = client_data.preprocess(preprocess_and_shuffle)
# The type signature will remain the same, but the batches will be shuffled.
first_client_dataset = preprocessed_and_shuffled.create_tf_dataset_for_client(
first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])
Сопряжение с tff.Computation
Теперь, когда мы можем выполнить основные манипуляции с ClientData
объектами, мы готовы к кормлению данных на tff.Computation
. Определим tff.templates.IterativeProcess
, который реализует Федеративные Усреднение , а также изучить различные методы передачи его данных.
def model_fn():
model = tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(784,)),
tf.keras.layers.Dense(10, kernel_initializer='zeros'),
])
return tff.learning.from_keras_model(
model,
# Note: input spec is the _batched_ shape, and includes the
# label tensor which will be passed to the loss function. This model is
# therefore configured to accept data _after_ it has been preprocessed.
input_spec=collections.OrderedDict(
x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
y=tf.TensorSpec(shape=[None, 1], dtype=tf.int64)),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
trainer = tff.learning.build_federated_averaging_process(
model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.01))
Перед тем , как приступить к работе с этим IterativeProcess
, один комментарий на семантике ClientData
в порядке. ClientData
объект представляет собой всю совокупность населения , доступного для федеративного обучения, что в целом является не доступным для выполнения среды производственной системы FL и специфичный для моделирования. ClientData
действительно дает пользователю способность к обводным федеративным вычислениям полностью и просто обучить серверную модель , как обычно , с помощью ClientData.create_tf_dataset_from_all_clients
.
Среда моделирования TFF дает исследователю полный контроль над внешним циклом. В частности, это подразумевает, что соображения доступности клиента, отключения клиента и т. Д. Должны решаться пользователем или скриптом драйвера Python. Можно было бы, например , модель клиента отсева, регулируя распределение выборки над вашим ClientData's
client_ids
такие , что пользователи с большим количеством данных (и соответственно более выполняющихся локальные вычисления) будут выбраны с меньшей вероятностью.
Однако в реальной федеративной системе обучающий модели не может явно выбирать клиентов; выбор клиентов делегируется системе, выполняющей объединенные вычисления.
Передача tf.data.Datasets
непосредственно TFF
Один из вариантов мы имеем для взаимодействия между ClientData
и IterativeProcess
является то , что построения tf.data.Datasets
в Python, и передавая эти наборы данных TFF.
Обратите внимание , что если мы используем наши препроцессированные ClientData
наборы данных мы уступаем имеют соответствующий тип ожидаемого нашей модели , определенной выше.
selected_client_ids = preprocessed_and_shuffled.client_ids[:10]
preprocessed_data_for_clients = [
preprocessed_and_shuffled.create_tf_dataset_for_client(
selected_client_ids[i]) for i in range(10)
]
state = trainer.initialize()
for _ in range(5):
t1 = time.time()
state, metrics = trainer.next(state, preprocessed_data_for_clients)
t2 = time.time()
print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.compat.v1.graph_util.extract_sub_graph` WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.compat.v1.graph_util.extract_sub_graph` loss 2.9005744457244873, round time 4.576513767242432 loss 3.113278388977051, round time 0.49641919136047363 loss 2.7581865787506104, round time 0.4904160499572754 loss 2.87259578704834, round time 0.48976993560791016 loss 3.1202380657196045, round time 0.6724586486816406
Если мы возьмем этот маршрут, однако, мы не сможем перейти к тривиальным многомашинному моделированию. Наборы данных мы построим в локальном режиме исполнения TensorFlow может захватывать состояние из окружающей среды питона, и не в сериализации или десериализации при попытке эталонного состояния , которое больше не доступны для них. Это может проявляться, например , в непостижимой ошибки из TensorFlow в tensor_util.cc
:
Check failed: DT_VARIANT == input.dtype() (21 vs. 20)
Построение карт и предварительная обработка клиентов
Чтобы избежать этой проблемы, ПТФ рекомендует свои пользователь рассмотреть набор данных экземпляра и предварительную обработку , как то , что происходит локально на каждом клиенте, и использовать помощник ПТФА или federated_map
явно запустить этот Preprocessing код на каждом клиенте.
Концептуально причина предпочтения этого ясна: в локальной среде выполнения TFF клиенты только «случайно» получают доступ к глобальной среде Python из-за того, что вся федеративная оркестровка происходит на одной машине. Здесь стоит отметить, что подобное мышление порождает кроссплатформенную, всегда сериализуемую функциональную философию TFF.
ПТФ делает такие изменения при помощи ClientData's
атрибуте dataset_computation
, в tff.Computation
, которая принимает client_id
и возвращает соответствующий tf.data.Dataset
.
Обратите внимание , что preprocess
просто работает с dataset_computation
; dataset_computation
атрибут препроцессора ClientData
включает весь трубопровод предварительной обработки мы только что определили:
print('dataset computation without preprocessing:')
print(client_data.dataset_computation.type_signature)
print('\n')
print('dataset computation with preprocessing:')
print(preprocessed_and_shuffled.dataset_computation.type_signature)
dataset computation without preprocessing: (string -> <label=int32,pixels=float32[28,28]>*) dataset computation with preprocessing: (string -> <x=float32[?,784],y=int64[?,1]>*)
Мы могли бы вызвать dataset_computation
и получить нетерпеливый набор данных во время выполнения Python, но реальная сила этого подхода осуществляются при составят с итерационным процессом или другого вычислением , чтобы избежать материализаций этих наборов данных в глобальном нетерпеливом выполнении на всех. ПТФ обеспечивает функцию помощника tff.simulation.compose_dataset_computation_with_iterative_process
, который может быть использован , чтобы сделать именно это.
trainer_accepting_ids = tff.simulation.compose_dataset_computation_with_iterative_process(
preprocessed_and_shuffled.dataset_computation, trainer)
И это tff.templates.IterativeProcesses
и один выше запустить таким же образом; но бывший принимает препроцессированные клиентские наборы данных, а последний принимает строки , представляющих клиентские идентификаторы, обработка как строительства набора данных и Preprocessing в своем теле - на самом деле state
может передаваться между ними.
for _ in range(5):
t1 = time.time()
state, metrics = trainer_accepting_ids.next(state, selected_client_ids)
t2 = time.time()
print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
loss 2.8417396545410156, round time 1.6707067489624023 loss 2.7670371532440186, round time 0.5207102298736572 loss 2.665048122406006, round time 0.5302855968475342 loss 2.7213189601898193, round time 0.5313887596130371 loss 2.580148935317993, round time 0.5283482074737549
Масштабирование до большого количества клиентов
trainer_accepting_ids
может быть немедленно использован в многомашинном выполнении ПТФА, и избегает материализации tf.data.Datasets
и контроллера (и , следовательно , сериализации их и отправить их к рабочим).
Это значительно ускоряет распределенное моделирование, особенно с большим количеством клиентов, и позволяет выполнять промежуточную агрегацию, чтобы избежать аналогичных накладных расходов на сериализацию / десериализацию.
Дополнительное глубокое погружение: вручную составлять логику предварительной обработки в TFF
TFF создан для композиционности с нуля; вид композиции, только что выполненный помощником TFF, полностью находится под нашим контролем как пользователей. Мы могли бы вручную составить расчет предварительной обработки мы только что определили с тренером собственного next
довольно просто:
selected_clients_type = tff.FederatedType(preprocessed_and_shuffled.dataset_computation.type_signature.parameter, tff.CLIENTS)
@tff.federated_computation(trainer.next.type_signature.parameter[0], selected_clients_type)
def new_next(server_state, selected_clients):
preprocessed_data = tff.federated_map(preprocessed_and_shuffled.dataset_computation, selected_clients)
return trainer.next(server_state, preprocessed_data)
manual_trainer_with_preprocessing = tff.templates.IterativeProcess(initialize_fn=trainer.initialize, next_fn=new_next)
Фактически, это фактически то, что использованный нами помощник делает под капотом (плюс выполняет соответствующую проверку типов и манипуляции). Мы могли бы даже выражали ту же логику несколько иначе, посредством сериализации preprocess_and_shuffle
в tff.Computation
и разложение federated_map
в один шаг , который строит не-препроцессор наборов данных и другие , который проходит preprocess_and_shuffle
на каждом клиенте.
Мы можем убедиться, что этот более ручной путь приводит к вычислениям с той же сигнатурой типа, что и помощник TFF (имена параметров по модулю):
print(trainer_accepting_ids.next.type_signature)
print(manual_trainer_with_preprocessing.next.type_signature)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,federated_dataset={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>) (<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,selected_clients={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)