Работа с ClientData tff.

Посмотреть на TensorFlow.org Запускаем в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

Понятие набора данных с ключами клиентов (например, пользователей) важно для объединенных вычислений, смоделированных в TFF. ПТФ обеспечивает интерфейс tff.simulation.datasets.ClientData абстрагироваться над этой концепцией, и наборы данных которые TFF хостов ( StackOverflow , Shakespeare , emnist , cifar100 и gldv2 ) все реализуют этот интерфейс.

Если вы работаете на федеративном обучение с собственным набором данных, ПТФ настоятельно рекомендую либо реализовать ClientData интерфейс или использовать один из вспомогательных функций , ПТФ, чтобы генерировать ClientData , который представляет ваши данные на диске, например , tff.simulation.datasets.ClientData.from_clients_and_fn .

В большинстве примеров ПТФ в впритык начать с ClientData объекты, реализующие ClientData интерфейс пользовательского набора данных будет сделать проще spelunk через существующий код , написанный с TFF. Кроме того, tf.data.Datasets , которые ClientData конструкции могут быть непосредственно итерации с получением структуры numpy массивов, поэтому ClientData объектов может быть использован с любым Python , на основе структуры ML , прежде чем перейти к TFF.

Есть несколько шаблонов, с помощью которых вы можете облегчить себе жизнь, если собираетесь масштабировать моделирование на множество машин или развертывать их. Ниже мы будем ходить через несколько способов , которые мы можем использовать ClientData и TFF , чтобы сделать нашу мелкосерийное итерация к крупномасштабной экспериментирование-к производству опыт развертывания как можно более гладким.

Какой шаблон мне следует использовать для передачи ClientData в TFF?

Мы рассмотрим два обычаи ПТФ в ClientData в глубину; Если вы подходите к одной из двух категорий, представленных ниже, вы явно предпочтете одну из них. В противном случае вам может потребоваться более подробное понимание плюсов и минусов каждого из них, чтобы сделать более тонкий выбор.

  • Я хочу выполнить итерацию как можно быстрее на локальной машине; Мне не нужно иметь возможность легко пользоваться преимуществами распределенной среды выполнения TFF.

    • Вы хотите передать tf.data.Datasets , чтобы TFF непосредственно.
    • Это позволяет программировать императивно с tf.data.Dataset объектов, и обрабатывать их как угодно.
    • Он обеспечивает большую гибкость, чем вариант ниже; передача логики клиентам требует, чтобы эта логика была сериализуемой.
  • Я хочу запустить свои федеративные вычисления в удаленной среде выполнения TFF или планирую сделать это в ближайшее время.

    • В этом случае вы хотите отобразить создание набора данных и предварительную обработку для клиентов.
    • Это приводит вас прохождение просто список client_ids непосредственно к федеративному вычислений.
    • Передача клиентам создания набора данных и предварительной обработки позволяет избежать узких мест в сериализации и значительно повысить производительность при работе с сотнями и тысячами клиентов.

Настроить среду с открытым исходным кодом

Импортировать пакеты

Управление объектом ClientData

Давайте начнем с загрузкой и изучением ПТФА в EMNIST ClientData :

client_data, _ = tff.simulation.datasets.emnist.load_data()
Downloading emnist_all.sqlite.lzma: 100%|██████████| 170507172/170507172 [00:19<00:00, 8831921.67it/s]
2021-10-01 11:17:58.718735: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

Осмотрев первый набор данных может сказать нам , какой тип примеров в ClientData .

first_client_id = client_data.client_ids[0]
first_client_dataset = client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
# This information is also available as a `ClientData` property:
assert client_data.element_type_structure == first_client_dataset.element_spec
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])

Обратите внимание , что выходы набора данных collections.OrderedDict объектов , которые имеют pixels и label ключей, где пиксели есть тензор с формой [28, 28] . Предположим , мы хотим , чтобы сгладить наши материалы к форме [784] . Одним из возможных способов , мы можем сделать это было бы применить функцию предварительной обработки для нашего ClientData объекта.

def preprocess_dataset(dataset):
  """Create batches of 5 examples, and limit to 3 batches."""

  def map_fn(input):
    return collections.OrderedDict(
        x=tf.reshape(input['pixels'], shape=(-1, 784)),
        y=tf.cast(tf.reshape(input['label'], shape=(-1, 1)), tf.int64),
    )

  return dataset.batch(5).map(
      map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE).take(5)


preprocessed_client_data = client_data.preprocess(preprocess_dataset)

# Notice that we have both reshaped and renamed the elements of the ordered dict.
first_client_dataset = preprocessed_client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

Мы можем захотеть дополнительно выполнить более сложную (и, возможно, с сохранением состояния) предварительную обработку, например перемешивание.

def preprocess_and_shuffle(dataset):
  """Applies `preprocess_dataset` above and shuffles the result."""
  preprocessed = preprocess_dataset(dataset)
  return preprocessed.shuffle(buffer_size=5)

preprocessed_and_shuffled = client_data.preprocess(preprocess_and_shuffle)

# The type signature will remain the same, but the batches will be shuffled.
first_client_dataset = preprocessed_and_shuffled.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

Сопряжение с tff.Computation

Теперь, когда мы можем выполнить основные манипуляции с ClientData объектами, мы готовы к кормлению данных на tff.Computation . Определим tff.templates.IterativeProcess , который реализует Федеративные Усреднение , а также изучить различные методы передачи его данных.

def model_fn():
  model = tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
  ])
  return tff.learning.from_keras_model(
      model,
      # Note: input spec is the _batched_ shape, and includes the 
      # label tensor which will be passed to the loss function. This model is
      # therefore configured to accept data _after_ it has been preprocessed.
      input_spec=collections.OrderedDict(
          x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
          y=tf.TensorSpec(shape=[None, 1], dtype=tf.int64)),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

trainer = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.01))

Перед тем , как приступить к работе с этим IterativeProcess , один комментарий на семантике ClientData в порядке. ClientData объект представляет собой всю совокупность населения , доступного для федеративного обучения, что в целом является не доступным для выполнения среды производственной системы FL и специфичный для моделирования. ClientData действительно дает пользователю способность к обводным федеративным вычислениям полностью и просто обучить серверную модель , как обычно , с помощью ClientData.create_tf_dataset_from_all_clients .

Среда моделирования TFF дает исследователю полный контроль над внешним циклом. В частности, это подразумевает, что соображения доступности клиента, отключения клиента и т. Д. Должны решаться пользователем или скриптом драйвера Python. Можно было бы, например , модель клиента отсева, регулируя распределение выборки над вашим ClientData's client_ids такие , что пользователи с большим количеством данных (и соответственно более выполняющихся локальные вычисления) будут выбраны с меньшей вероятностью.

Однако в реальной федеративной системе обучающий модели не может явно выбирать клиентов; выбор клиентов делегируется системе, выполняющей объединенные вычисления.

Передача tf.data.Datasets непосредственно TFF

Один из вариантов мы имеем для взаимодействия между ClientData и IterativeProcess является то , что построения tf.data.Datasets в Python, и передавая эти наборы данных TFF.

Обратите внимание , что если мы используем наши препроцессированные ClientData наборы данных мы уступаем имеют соответствующий тип ожидаемого нашей модели , определенной выше.

selected_client_ids = preprocessed_and_shuffled.client_ids[:10]

preprocessed_data_for_clients = [
    preprocessed_and_shuffled.create_tf_dataset_for_client(
        selected_client_ids[i]) for i in range(10)
]

state = trainer.initialize()
for _ in range(5):
  t1 = time.time()
  state, metrics = trainer.next(state, preprocessed_data_for_clients)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
loss 2.9005744457244873, round time 4.576513767242432
loss 3.113278388977051, round time 0.49641919136047363
loss 2.7581865787506104, round time 0.4904160499572754
loss 2.87259578704834, round time 0.48976993560791016
loss 3.1202380657196045, round time 0.6724586486816406

Если мы возьмем этот маршрут, однако, мы не сможем перейти к тривиальным многомашинному моделированию. Наборы данных мы построим в локальном режиме исполнения TensorFlow может захватывать состояние из окружающей среды питона, и не в сериализации или десериализации при попытке эталонного состояния , которое больше не доступны для них. Это может проявляться, например , в непостижимой ошибки из TensorFlow в tensor_util.cc :

Check failed: DT_VARIANT == input.dtype() (21 vs. 20)

Построение карт и предварительная обработка клиентов

Чтобы избежать этой проблемы, ПТФ рекомендует свои пользователь рассмотреть набор данных экземпляра и предварительную обработку , как то , что происходит локально на каждом клиенте, и использовать помощник ПТФА или federated_map явно запустить этот Preprocessing код на каждом клиенте.

Концептуально причина предпочтения этого ясна: в локальной среде выполнения TFF клиенты только «случайно» получают доступ к глобальной среде Python из-за того, что вся федеративная оркестровка происходит на одной машине. Здесь стоит отметить, что подобное мышление порождает кроссплатформенную, всегда сериализуемую функциональную философию TFF.

ПТФ делает такие изменения при помощи ClientData's атрибуте dataset_computation , в tff.Computation , которая принимает client_id и возвращает соответствующий tf.data.Dataset .

Обратите внимание , что preprocess просто работает с dataset_computation ; dataset_computation атрибут препроцессора ClientData включает весь трубопровод предварительной обработки мы только что определили:

print('dataset computation without preprocessing:')
print(client_data.dataset_computation.type_signature)
print('\n')
print('dataset computation with preprocessing:')
print(preprocessed_and_shuffled.dataset_computation.type_signature)
dataset computation without preprocessing:
(string -> <label=int32,pixels=float32[28,28]>*)


dataset computation with preprocessing:
(string -> <x=float32[?,784],y=int64[?,1]>*)

Мы могли бы вызвать dataset_computation и получить нетерпеливый набор данных во время выполнения Python, но реальная сила этого подхода осуществляются при составят с итерационным процессом или другого вычислением , чтобы избежать материализаций этих наборов данных в глобальном нетерпеливом выполнении на всех. ПТФ обеспечивает функцию помощника tff.simulation.compose_dataset_computation_with_iterative_process , который может быть использован , чтобы сделать именно это.

trainer_accepting_ids = tff.simulation.compose_dataset_computation_with_iterative_process(
    preprocessed_and_shuffled.dataset_computation, trainer)

И это tff.templates.IterativeProcesses и один выше запустить таким же образом; но бывший принимает препроцессированные клиентские наборы данных, а последний принимает строки , представляющих клиентские идентификаторы, обработка как строительства набора данных и Preprocessing в своем теле - на самом деле state может передаваться между ними.

for _ in range(5):
  t1 = time.time()
  state, metrics = trainer_accepting_ids.next(state, selected_client_ids)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
loss 2.8417396545410156, round time 1.6707067489624023
loss 2.7670371532440186, round time 0.5207102298736572
loss 2.665048122406006, round time 0.5302855968475342
loss 2.7213189601898193, round time 0.5313887596130371
loss 2.580148935317993, round time 0.5283482074737549

Масштабирование до большого количества клиентов

trainer_accepting_ids может быть немедленно использован в многомашинном выполнении ПТФА, и избегает материализации tf.data.Datasets и контроллера (и , следовательно , сериализации их и отправить их к рабочим).

Это значительно ускоряет распределенное моделирование, особенно с большим количеством клиентов, и позволяет выполнять промежуточную агрегацию, чтобы избежать аналогичных накладных расходов на сериализацию / десериализацию.

Дополнительное глубокое погружение: вручную составлять логику предварительной обработки в TFF

TFF создан для композиционности с нуля; вид композиции, только что выполненный помощником TFF, полностью находится под нашим контролем как пользователей. Мы могли бы вручную составить расчет предварительной обработки мы только что определили с тренером собственного next довольно просто:

selected_clients_type = tff.FederatedType(preprocessed_and_shuffled.dataset_computation.type_signature.parameter, tff.CLIENTS)

@tff.federated_computation(trainer.next.type_signature.parameter[0], selected_clients_type)
def new_next(server_state, selected_clients):
  preprocessed_data = tff.federated_map(preprocessed_and_shuffled.dataset_computation, selected_clients)
  return trainer.next(server_state, preprocessed_data)

manual_trainer_with_preprocessing = tff.templates.IterativeProcess(initialize_fn=trainer.initialize, next_fn=new_next)

Фактически, это фактически то, что использованный нами помощник делает под капотом (плюс выполняет соответствующую проверку типов и манипуляции). Мы могли бы даже выражали ту же логику несколько иначе, посредством сериализации preprocess_and_shuffle в tff.Computation и разложение federated_map в один шаг , который строит не-препроцессор наборов данных и другие , который проходит preprocess_and_shuffle на каждом клиенте.

Мы можем убедиться, что этот более ручной путь приводит к вычислениям с той же сигнатурой типа, что и помощник TFF (имена параметров по модулю):

print(trainer_accepting_ids.next.type_signature)
print(manual_trainer_with_preprocessing.next.type_signature)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,federated_dataset={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,selected_clients={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)