Zobacz na TensorFlow.org | Uruchom w Google Colab | Wyświetl źródło na GitHub | Pobierz notatnik |
Pojęcie zestawu danych z kluczami klientów (np. użytkowników) jest niezbędne do obliczeń federacyjnych, jak modelowano w TFF. TFF zapewnia interfejs tff.simulation.datasets.ClientData
abstrahować nad tą koncepcją, a zbiory danych, które hosty TFF ( stackoverflow , Szekspir , emnist , cifar100 i gldv2 ) wszystko wdrożyć ten interfejs.
Jeśli pracujesz na stowarzyszonym nauki z własnego zbioru danych, TFF silnie zachęca albo wdrożyć ClientData
jeden interfejs lub użyć funkcji pomocniczych TFF, aby wygenerować ClientData
który reprezentuje dane na dysku, np tff.simulation.datasets.ClientData.from_clients_and_fn
.
Jak większość z end-to-end przykładów TFF zaczynają z ClientData
obiektów, wdrożenia ClientData
interfejs z niestandardowego zestawu danych będzie łatwiej spelunk pośrednictwem istniejącego kodu napisanego z TFF. Ponadto, tf.data.Datasets
które ClientData
konstrukty mogą być iterowane się bezpośrednio z wytworzeniem struktury numpy
tablicach, tak ClientData
przedmiotów można stosować Pythona oparciu ramami ML przed przejściem do TFF.
Istnieje kilka wzorców, dzięki którym możesz ułatwić sobie życie, jeśli zamierzasz skalować swoje symulacje na wielu maszynach lub je wdrażać. Poniżej będziemy chodzić przez kilka sposobów możemy wykorzystać ClientData
i TFF, aby nasz małą skalę iteracja-na dużą skalę eksperymenty, do produkcji doświadczenie wdrażania gładka jak to możliwe.
Jakiego wzorca powinienem użyć, aby przekazać ClientData do TFF?
Omówimy dwa zwyczaje TFF za ClientData
głębokości; jeśli pasujesz do jednej z dwóch poniższych kategorii, wyraźnie wolisz jedną od drugiej. Jeśli nie, możesz potrzebować bardziej szczegółowego zrozumienia zalet i wad każdego z nich, aby dokonać bardziej zniuansowanego wyboru.
Chcę jak najszybciej wykonać iterację na komputerze lokalnym; Nie muszę być w stanie łatwo korzystać z rozproszonego środowiska wykonawczego TFF.
- Chcesz przekazać
tf.data.Datasets
do TFF bezpośrednio. - To pozwala na zaprogramowanie koniecznie z
tf.data.Dataset
obiektów oraz przetwarzać je w sposób arbitralny. - Zapewnia większą elastyczność niż opcja poniżej; wypychanie logiki do klientów wymaga, aby ta logika była możliwa do serializacji.
- Chcesz przekazać
Chcę uruchomić swoje sfederowane obliczenia w zdalnym środowisku wykonawczym TFF lub planuję to zrobić wkrótce.
- W takim przypadku chcesz odwzorować budowę i wstępne przetwarzanie zestawu danych na klientów.
- To skutkuje Ciebie przechodząc po prostu listę
client_ids
bezpośrednio do stowarzyszonego obliczeń. - Wypychanie konstrukcji zestawu danych i wstępnego przetwarzania do klientów pozwala uniknąć wąskich gardeł w serializacji i znacznie zwiększa wydajność w przypadku setek do tysięcy klientów.
Skonfiguruj środowisko open-source
# tensorflow_federated_nightly also bring in tf_nightly, which
# can causes a duplicate tensorboard install, leading to errors.
!pip uninstall --yes tensorboard tb-nightly
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
Importuj paczki
import collections
import time
import tensorflow as tf
import tensorflow_federated as tff
Manipulowanie obiektem ClientData
Zacznijmy od załadunku i odkrywania TFF za EMNIST ClientData
:
client_data, _ = tff.simulation.datasets.emnist.load_data()
Downloading emnist_all.sqlite.lzma: 100%|██████████| 170507172/170507172 [00:19<00:00, 8831921.67it/s] 2021-10-01 11:17:58.718735: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Kontrola pierwszego zestawu danych może nam powiedzieć, jakiego rodzaju przykłady są w ClientData
.
first_client_id = client_data.client_ids[0]
first_client_dataset = client_data.create_tf_dataset_for_client(
first_client_id)
print(first_client_dataset.element_spec)
# This information is also available as a `ClientData` property:
assert client_data.element_type_structure == first_client_dataset.element_spec
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])
Należy pamiętać, że wydajność zbioru danych collections.OrderedDict
obiektów, które mają pixels
i label
klawiszy gdzie pikseli jest napinacz w kształcie [28, 28]
. Załóżmy, że chcemy wyprostować nasze wejść się do kształtu [784]
. Jednym z możliwych sposobów możemy to zrobić byłoby zastosować funkcję wstępnego przetwarzania do naszego ClientData
obiektu.
def preprocess_dataset(dataset):
"""Create batches of 5 examples, and limit to 3 batches."""
def map_fn(input):
return collections.OrderedDict(
x=tf.reshape(input['pixels'], shape=(-1, 784)),
y=tf.cast(tf.reshape(input['label'], shape=(-1, 1)), tf.int64),
)
return dataset.batch(5).map(
map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE).take(5)
preprocessed_client_data = client_data.preprocess(preprocess_dataset)
# Notice that we have both reshaped and renamed the elements of the ordered dict.
first_client_dataset = preprocessed_client_data.create_tf_dataset_for_client(
first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])
Możemy chcieć dodatkowo wykonać bardziej złożone (i prawdopodobnie stanowe) przetwarzanie wstępne, na przykład tasowanie.
def preprocess_and_shuffle(dataset):
"""Applies `preprocess_dataset` above and shuffles the result."""
preprocessed = preprocess_dataset(dataset)
return preprocessed.shuffle(buffer_size=5)
preprocessed_and_shuffled = client_data.preprocess(preprocess_and_shuffle)
# The type signature will remain the same, but the batches will be shuffled.
first_client_dataset = preprocessed_and_shuffled.create_tf_dataset_for_client(
first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])
Relacje ze tff.Computation
Teraz możemy wykonać pewne podstawowe manipulacje z ClientData
obiektów, jesteśmy gotowi do danych zasilających do tff.Computation
. Definiujemy tff.templates.IterativeProcess
który implementuje Federalne Uśrednianie i zbadać różne sposoby przekazując mu dane.
def model_fn():
model = tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(784,)),
tf.keras.layers.Dense(10, kernel_initializer='zeros'),
])
return tff.learning.from_keras_model(
model,
# Note: input spec is the _batched_ shape, and includes the
# label tensor which will be passed to the loss function. This model is
# therefore configured to accept data _after_ it has been preprocessed.
input_spec=collections.OrderedDict(
x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
y=tf.TensorSpec(shape=[None, 1], dtype=tf.int64)),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
trainer = tff.learning.build_federated_averaging_process(
model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.01))
Zanim rozpoczniemy pracę z tym IterativeProcess
, jeden komentarz na semantyki ClientData
jest w porządku. ClientData
obiekt reprezentuje całość populacji dostępnej dla stowarzyszonym szkolenia, które w ogóle jest niedostępna dla środowiska wykonanie systemu produkcyjnego FL i jest specyficzny dla symulacji. ClientData
rzeczywiście daje użytkownikowi zdolność do ominięcia stowarzyszonym computing całkowicie i po prostu trenować model po stronie serwera jak zwykle poprzez ClientData.create_tf_dataset_from_all_clients
.
Środowisko symulacyjne TFF daje badaczowi pełną kontrolę nad zewnętrzną pętlą. W szczególności oznacza to, że względy dostępności klienta, porzucania klienta itp. muszą być uwzględnione w skrypcie użytkownika lub sterownika Pythona. Można by na przykład model klient porzucaniu przez dostosowanie rozkładu próbkowania nad ClientData's
client_ids
taki, że użytkownicy z większej ilości danych (i odpowiednio dłużej działa lokalnych obliczeń) zostanie wybrany z niższym prawdopodobieństwem.
W rzeczywistym systemie sfederowanym klienci nie mogą być jednak wyraźnie wybierani przez trenera modelu; wybór klientów jest delegowany do systemu, który wykonuje obliczenia sfederowane.
Przechodząc tf.data.Datasets
bezpośrednio do TFF
Jedną z opcji mamy na współpracę między ClientData
i IterativeProcess
polega na konstruowaniu tf.data.Datasets
w Pythonie i przekazywanie tych zestawów danych do TFF.
Należy zauważyć, że, gdy używamy wstępnie przygotowane ClientData
zestawy danych, to jest wydajność odpowiedniego rodzaju oczekiwanego naszym modelu określono powyżej.
selected_client_ids = preprocessed_and_shuffled.client_ids[:10]
preprocessed_data_for_clients = [
preprocessed_and_shuffled.create_tf_dataset_for_client(
selected_client_ids[i]) for i in range(10)
]
state = trainer.initialize()
for _ in range(5):
t1 = time.time()
state, metrics = trainer.next(state, preprocessed_data_for_clients)
t2 = time.time()
print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.compat.v1.graph_util.extract_sub_graph` WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.compat.v1.graph_util.extract_sub_graph` loss 2.9005744457244873, round time 4.576513767242432 loss 3.113278388977051, round time 0.49641919136047363 loss 2.7581865787506104, round time 0.4904160499572754 loss 2.87259578704834, round time 0.48976993560791016 loss 3.1202380657196045, round time 0.6724586486816406
Jeśli weźmiemy tę trasę, jednak będziemy w stanie przenieść się trywialnie multimachine symulacji. Zestawy danych budujemy w lokalnym środowisku wykonawczym TensorFlow może przechwycić stan z otaczającego środowiska Pythona, a nie w serializacji lub deserializacji gdy próbują stanu odniesienia, który nie jest już dostępna na nich jest. Może się to objawiać na przykład w niezgłębionej błędu z TensorFlow za tensor_util.cc
:
Check failed: DT_VARIANT == input.dtype() (21 vs. 20)
Mapowanie konstrukcji i preprocessingu nad klientami
Aby uniknąć tego problemu, TFF zaleca użytkownikom rozważyć zestawu danych instancji i przerób jako coś, co dzieje się lokalnie na każdym kliencie i użyć pomocników TFF lub federated_map
jawnie uruchomić ten kod przerób na każdym kliencie.
Koncepcyjnie powód preferowania tego jest jasny: w lokalnym środowisku wykonawczym TFF klienci mają dostęp do globalnego środowiska Python tylko „przypadkowo”, ponieważ cała sfederowana orkiestracja odbywa się na jednym komputerze. Warto w tym miejscu zauważyć, że podobne myślenie daje początek wieloplatformowej, zawsze możliwej do serializacji, funkcjonalnej filozofii TFF.
TFF sprawia, że zmiana taka prosta poprzez ClientData's
atrybut dataset_computation
, a tff.Computation
który odbywa się client_id
i zwraca powiązany tf.data.Dataset
.
Zauważ, że preprocess
po prostu działa z dataset_computation
; dataset_computation
atrybutem obróbką ClientData
zawiera całą rurociągu przebiegu wyprzedzającego po prostu zdefiniowane:
print('dataset computation without preprocessing:')
print(client_data.dataset_computation.type_signature)
print('\n')
print('dataset computation with preprocessing:')
print(preprocessed_and_shuffled.dataset_computation.type_signature)
dataset computation without preprocessing: (string -> <label=int32,pixels=float32[28,28]>*) dataset computation with preprocessing: (string -> <x=float32[?,784],y=int64[?,1]>*)
Możemy powołać dataset_computation
i otrzymać chętny zestaw danych w czasie wykonywania Pythona, ale prawdziwa siła tego podejścia jest wykonywana gdy komponujemy z iteracyjnego procesu lub innego obliczeń uniknięcia materializacji tych zbiorów danych w globalnym chętny wykonywania w ogóle. TFF zapewnia funkcję pomocnika tff.simulation.compose_dataset_computation_with_iterative_process
który może być używany do dokładnie to zrobić.
trainer_accepting_ids = tff.simulation.compose_dataset_computation_with_iterative_process(
preprocessed_and_shuffled.dataset_computation, trainer)
Zarówno ten tff.templates.IterativeProcesses
i jeden powyżej uruchomić ten sam sposób; ale były akceptuje wstępnie przygotowane zestawy danych klienta, a ten ostatni akceptuje ciągów reprezentujących identyfikatory klientów, obsługi zarówno budowę zestawu danych i przerób w jego ciele - w rzeczywistości state
mogą być przekazywane między nimi.
for _ in range(5):
t1 = time.time()
state, metrics = trainer_accepting_ids.next(state, selected_client_ids)
t2 = time.time()
print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
loss 2.8417396545410156, round time 1.6707067489624023 loss 2.7670371532440186, round time 0.5207102298736572 loss 2.665048122406006, round time 0.5302855968475342 loss 2.7213189601898193, round time 0.5313887596130371 loss 2.580148935317993, round time 0.5283482074737549
Skalowanie do dużej liczby klientów
trainer_accepting_ids
mogą być natychmiast wykorzystane w multimachine wykonywania TFF, a unika materializacji tf.data.Datasets
i kontroler (a zatem ich szeregowania i wysyłając je do robotników).
To znacznie przyspiesza symulacje rozproszone, zwłaszcza w przypadku dużej liczby klientów, i umożliwia pośrednią agregację, aby uniknąć podobnych kosztów związanych z serializacją/deserializacją.
Opcjonalne deepdive: ręczne komponowanie logiki przetwarzania wstępnego w TFF
TFF jest zaprojektowany od podstaw z myślą o kompozycyjności; rodzaj kompozycji wykonanej właśnie przez pomocnika TFF jest w pełni pod naszą kontrolą jako użytkowników. Mogliśmy ręcznie komponować obliczeń przebiegu wyprzedzającego właśnie zdefiniowana z własnego trener next
całkiem prosto:
selected_clients_type = tff.FederatedType(preprocessed_and_shuffled.dataset_computation.type_signature.parameter, tff.CLIENTS)
@tff.federated_computation(trainer.next.type_signature.parameter[0], selected_clients_type)
def new_next(server_state, selected_clients):
preprocessed_data = tff.federated_map(preprocessed_and_shuffled.dataset_computation, selected_clients)
return trainer.next(server_state, preprocessed_data)
manual_trainer_with_preprocessing = tff.templates.IterativeProcess(initialize_fn=trainer.initialize, next_fn=new_next)
W rzeczywistości to właśnie robi pod maską pomocnik, którego użyliśmy (plus wykonanie odpowiedniego sprawdzenia typu i manipulacji). Mogliśmy nawet wyraziły taką samą logikę nieco inaczej, przez szeregowania preprocess_and_shuffle
w tff.Computation
i rozkładając federated_map
w jeden krok, który konstruuje un-preprocesowany zestawów danych i innej, która biegnie preprocess_and_shuffle
na każdego klienta.
Możemy zweryfikować, że ta bardziej ręczna ścieżka prowadzi do obliczeń z taką samą sygnaturą typu jak pomocnik TFF (nazwy parametrów modulo):
print(trainer_accepting_ids.next.type_signature)
print(manual_trainer_with_preprocessing.next.type_signature)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,federated_dataset={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>) (<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,selected_clients={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)