הצג באתר TensorFlow.org | הפעל בגוגל קולאב | צפה במקור ב-GitHub | הורד מחברת |
הרעיון של מערך נתונים המבוסס על ידי לקוחות (למשל משתמשים) חיוני לחישוב מאוחד כפי שעוצב ב-TFF. TFF מספק ממשק tff.simulation.datasets.ClientData
כדי מופשט מעל המושג הזה, ואת מערכי נתונים אשר המארחים TFF ( StackOverflow , שייקספיר , emnist , cifar100 , ו gldv2 ) כל ליישם ממשק זה.
אם אתה עובד על למידה Federated עם הנתונים שלכם, TFF ממליצה לך גם ליישם את ClientData
אחד ממשק או שימוש בפונקציות העוזר של TFF ליצור ClientData
המייצגת את הנתונים בדיסק, למשל tff.simulation.datasets.ClientData.from_clients_and_fn
.
כמו רוב דוגמאות מקצה לקצה של TFF להתחיל עם ClientData
אובייקטים, יישום ClientData
ממשק עם הנתונים מותאמים אישית שלך תקל על spelunk באמצעות הקוד הקיים בכתב עם TFF. יתר על כן, tf.data.Datasets
אשר ClientData
המבנים ניתן iterated מעל ישירות להניב מבנים של numpy
מערכים, כך ClientData
עצמים ניתן להשתמש עם כל מסגרת פיתון מבוססי ML לפני שעבר TFF.
ישנם מספר דפוסים שבאמצעותם אתה יכול להקל על חייך אם אתה מתכוון להגדיל את הסימולציות שלך למכונות רבות או לפרוס אותן. להלן נסקור כמה מן הדרכים שבהן אנו יכולים להשתמש ClientData
ו TFF לעשות בקנה מידה קטן שלנו איטרציה-אל-מידה גדול ניסויים-ייצור ניסיון פריסה חלקה ככל האפשר.
באיזה דפוס עלי להשתמש כדי להעביר ClientData לתוך TFF?
נדונו בשני שימושים של של TFF ClientData
לעומק; אם אתה מתאים לאחת משתי הקטגוריות שלהלן, ברור שתעדיף אחת על השנייה. אם לא, ייתכן שתזדקק להבנה מפורטת יותר של היתרונות והחסרונות של כל אחד מהם כדי לעשות בחירה יותר ניואנסית.
אני רוצה לחזור כמה שיותר מהר על מכונה מקומית; אני לא צריך להיות מסוגל לנצל בקלות את זמן הריצה המבוזר של TFF.
- אתה רוצה להעביר
tf.data.Datasets
כדי TFF ישירות. - זה מאפשר לך לתכנת במפגיע עם
tf.data.Dataset
אובייקטים, ולעבד אותם באופן שרירותי. - זה מספק יותר גמישות מהאפשרות שלהלן; דחיפה של לוגיקה ללקוחות דורשת שהלוגיקה הזו תהיה ניתנת לסידרה.
- אתה רוצה להעביר
אני רוצה להפעיל את החישוב המאוחד שלי בזמן ריצה מרחוק של TFF, או שאני מתכנן לעשות זאת בקרוב.
- במקרה זה ברצונך למפות בניית מערכי נתונים ועיבוד מקדים ללקוחות.
- תוצאות זה אתה עובר פשוט רשימה של
client_ids
ישירות חישוב Federated שלך. - דחיפה של בניית מערכי נתונים ועיבוד מקדים ללקוחות מונעת צווארי בקבוק בסריאליזציה, ומגדילה משמעותית את הביצועים עם מאות עד אלפי לקוחות.
הגדר סביבת קוד פתוח
# tensorflow_federated_nightly also bring in tf_nightly, which
# can causes a duplicate tensorboard install, leading to errors.
!pip uninstall --yes tensorboard tb-nightly
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
ייבוא חבילות
import collections
import time
import tensorflow as tf
import tensorflow_federated as tff
מניפולציה של אובייקט ClientData
בואו נתחיל בכך העמסה לחקור EMNIST של TFF ClientData
:
client_data, _ = tff.simulation.datasets.emnist.load_data()
Downloading emnist_all.sqlite.lzma: 100%|██████████| 170507172/170507172 [00:19<00:00, 8831921.67it/s] 2021-10-01 11:17:58.718735: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
בדיקת הנתונים הראשונים יכולה לספר לנו איזה סוג של דוגמא נמצאות ClientData
.
first_client_id = client_data.client_ids[0]
first_client_dataset = client_data.create_tf_dataset_for_client(
first_client_id)
print(first_client_dataset.element_spec)
# This information is also available as a `ClientData` property:
assert client_data.element_type_structure == first_client_dataset.element_spec
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])
שים לב כי התשואות במערך collections.OrderedDict
חפצות שיש pixels
ו label
מפתחות, שבו פיקסלים הוא מותח עם צורה [28, 28]
. נניח שברצוננו לשטח תשומות שלנו אל הצורה [784]
. דרך אחת אפשרית שאנחנו יכולים לעשות זה יהיה ליישם פונקציה עיבוד מראש כדי שלנו ClientData
האובייקט.
def preprocess_dataset(dataset):
"""Create batches of 5 examples, and limit to 3 batches."""
def map_fn(input):
return collections.OrderedDict(
x=tf.reshape(input['pixels'], shape=(-1, 784)),
y=tf.cast(tf.reshape(input['label'], shape=(-1, 1)), tf.int64),
)
return dataset.batch(5).map(
map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE).take(5)
preprocessed_client_data = client_data.preprocess(preprocess_dataset)
# Notice that we have both reshaped and renamed the elements of the ordered dict.
first_client_dataset = preprocessed_client_data.create_tf_dataset_for_client(
first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])
ייתכן שנרצה בנוסף לבצע עיבוד מוקדם יותר מורכב (ואולי מצבי), למשל ערבוב.
def preprocess_and_shuffle(dataset):
"""Applies `preprocess_dataset` above and shuffles the result."""
preprocessed = preprocess_dataset(dataset)
return preprocessed.shuffle(buffer_size=5)
preprocessed_and_shuffled = client_data.preprocess(preprocess_and_shuffle)
# The type signature will remain the same, but the batches will be shuffled.
first_client_dataset = preprocessed_and_shuffled.create_tf_dataset_for_client(
first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])
התממשקות עם tff.Computation
עכשיו אנחנו יכולים לבצע כמה מניפולציות בסיסיות עם ClientData
חפץ, אנחנו מוכנים עדכון נתונים על tff.Computation
. אנו מגדירים tff.templates.IterativeProcess
אשר מיישם ממוצעי Federated , ולחקור שיטות שונות של העברתו נתונים.
def model_fn():
model = tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(784,)),
tf.keras.layers.Dense(10, kernel_initializer='zeros'),
])
return tff.learning.from_keras_model(
model,
# Note: input spec is the _batched_ shape, and includes the
# label tensor which will be passed to the loss function. This model is
# therefore configured to accept data _after_ it has been preprocessed.
input_spec=collections.OrderedDict(
x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
y=tf.TensorSpec(shape=[None, 1], dtype=tf.int64)),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
trainer = tff.learning.build_federated_averaging_process(
model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.01))
לפני שאנו מתחילים לעבוד עם זה IterativeProcess
, תגובה אחת על הסמנטיקה של ClientData
היא בסדר. ClientData
אובייקט מייצג את מכלול האוכלוסייה האפשרית לאימוני Federated, אשר באופן כללי הוא לא זמין לסביבת ביצוע מערכת ייצור FL הוא ספציפי סימולציה. ClientData
אכן נותן למשתמש את היכולת מחשוב Federated עוקף לחלוטין ופשוט לאמן מודל בצד השרת כרגיל באמצעות ClientData.create_tf_dataset_from_all_clients
.
סביבת הסימולציה של TFF מעמידה את החוקר בשליטה מלאה על הלולאה החיצונית. במיוחד זה מרמז על שיקולים של זמינות לקוח, נשירת לקוח וכו', חייבים להיות מטופלים על ידי המשתמש או סקריפט מנהל ההתקן של Python. לכן, ניתן לומר על נשירת לקוח מודל לדוגמא על ידי התאמת התפלגות הדגימה מעל שלך ClientData's
client_ids
שמשתמש כאלה עם נתונים יותר (ובהתאמה כבר ריצה חישובים מקומיים) ייבחר עם הסתברות נמוכה.
עם זאת, במערכת מאוחדת אמיתית, לקוחות לא יכולים להיבחר במפורש על ידי מאמן המודלים; בחירת הלקוחות מואצלת למערכת שמבצעת את החישוב המאוחד.
עובר tf.data.Datasets
ישירות TFF
אפשרות אחת יש לנו להתממשקות בין ClientData
וכן IterativeProcess
היא בניית tf.data.Datasets
בפייתון, ולהעביר מערכי נתונים אלה כדי TFF.
שימו לב שאם אנו משתמשים שעברו עיבוד מקדים שלנו ClientData
בבסיסי הנתונים שאנו להניב הם מהסוג המתאים צפוי ידי המודל שלנו כמוגדר לעיל.
selected_client_ids = preprocessed_and_shuffled.client_ids[:10]
preprocessed_data_for_clients = [
preprocessed_and_shuffled.create_tf_dataset_for_client(
selected_client_ids[i]) for i in range(10)
]
state = trainer.initialize()
for _ in range(5):
t1 = time.time()
state, metrics = trainer.next(state, preprocessed_data_for_clients)
t2 = time.time()
print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.compat.v1.graph_util.extract_sub_graph` WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.compat.v1.graph_util.extract_sub_graph` loss 2.9005744457244873, round time 4.576513767242432 loss 3.113278388977051, round time 0.49641919136047363 loss 2.7581865787506104, round time 0.4904160499572754 loss 2.87259578704834, round time 0.48976993560791016 loss 3.1202380657196045, round time 0.6724586486816406
אם ניקח המסלול הזה, עם זאת, לא נוכל לעבור טריוויאלית כדי סימולציה multimachine. מערכי הנתונים אנו בונים את ריצת TensorFlow המקומית יכולים ללכוד מדינה מסביבת פיתון שמסביב, וגם להיכשל בהמשכים או deserialization כשהם מנסים מצב התייחסות אשר איננו זמין להם. זה יכול להתבטא למשל שגיאה מובנת מן של TensorFlow tensor_util.cc
:
Check failed: DT_VARIANT == input.dtype() (21 vs. 20)
מיפוי בנייה ועיבוד מקדים על פני הלקוחות
כדי למנוע בעיה זו, TFF ממליצה למשתמשים שלה כדי לשקול במערך אדגום וביצוע עיבוד מקדים כמשהו שקורה באופן מקומי על כל לקוח, ולהשתמש העוזרים של TFF או federated_map
לרוץ זה קוד עיבוד מקדים במפורש על כל לקוח.
מבחינה קונספטואלית, הסיבה להעדפת זה ברורה: בזמן הריצה המקומי של TFF, ללקוחות יש רק "בטעות" גישה לסביבת Python העולמית בשל העובדה שכל התזמור המאוחד מתרחש על מכונה אחת. ראוי לציין בשלב זה שחשיבה דומה מולידה את הפילוסופיה הפונקציונלית חוצת הפלטפורמות, הניתנת תמיד להמשכה, של TFF.
TFF מבצע שינוי כזה פשוט באמצעות ClientData's
תכונה dataset_computation
, A tff.Computation
אשר לוקח client_id
ומחזירה את קשורה tf.data.Dataset
.
שים לב preprocess
פשוט עובד עם dataset_computation
; dataset_computation
התכונה של מעובדי ClientData
משלבת בצנרת המקדימה כול אנחנו פשוט מוגדרות:
print('dataset computation without preprocessing:')
print(client_data.dataset_computation.type_signature)
print('\n')
print('dataset computation with preprocessing:')
print(preprocessed_and_shuffled.dataset_computation.type_signature)
dataset computation without preprocessing: (string -> <label=int32,pixels=float32[28,28]>*) dataset computation with preprocessing: (string -> <x=float32[?,784],y=int64[?,1]>*)
יכולנו להפעיל dataset_computation
ולקבל במערך להוט על ריצת Python, אבל הכח האמיתי של גישה זו מבוצע כאשר אנו להלחין עם תהליך שחוזר על עצמו או חישוב אחר, כדי למנוע התממשות מערכי נתונים אלה ריצה העולמי הלהוט בכלל. TFF מספק פונקציה עוזר tff.simulation.compose_dataset_computation_with_iterative_process
אשר ניתן להשתמש בהם כדי לעשות בדיוק את זה.
trainer_accepting_ids = tff.simulation.compose_dataset_computation_with_iterative_process(
preprocessed_and_shuffled.dataset_computation, trainer)
שניהם זה tff.templates.IterativeProcesses
ואת אחד מעל לרוץ באותה דרך; אבל לשעבר מקבל מערכי נתוני לקוח שעברו עיבוד ראשוניים, ולאחר שהאחרון מקבל מחרוזות המייצגות מזהות לקוח, טיפול שני בנייה נתון וביצוע עיבוד מקדימים בגוף שלה - למעשה state
יכולה להיות מועברת בין שתיים.
for _ in range(5):
t1 = time.time()
state, metrics = trainer_accepting_ids.next(state, selected_client_ids)
t2 = time.time()
print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
loss 2.8417396545410156, round time 1.6707067489624023 loss 2.7670371532440186, round time 0.5207102298736572 loss 2.665048122406006, round time 0.5302855968475342 loss 2.7213189601898193, round time 0.5313887596130371 loss 2.580148935317993, round time 0.5283482074737549
קנה מידה למספר רב של לקוחות
trainer_accepting_ids
יכול לשמש מייד ריצת multimachine של TFF, ונמנע להתממש tf.data.Datasets
ואת הבקר (ולכן בהמשכים אותם שולחים אותם לפועלים).
זה מזרז באופן משמעותי סימולציות מבוזרות, במיוחד עם מספר רב של לקוחות, ומאפשר צבירה ביניים כדי למנוע תקורה של סריאליזציה/דה-סריאליזציה דומה.
צניחה עמוקה אופציונלית: חיבור ידני של לוגיקה של עיבוד מקדים ב-TFF
TFF מיועד לקומפוזיציות מהיסוד; סוג הקומפוזיציה שבוצע זה עתה על ידי העוזר של TFF נמצא בשליטה מלאה שלנו כמשתמשים. אנחנו יכולים להיות להלחין בחישוב המקדים ידני רק הגדרנו עם המאמן עצמו next
בפשטות:
selected_clients_type = tff.FederatedType(preprocessed_and_shuffled.dataset_computation.type_signature.parameter, tff.CLIENTS)
@tff.federated_computation(trainer.next.type_signature.parameter[0], selected_clients_type)
def new_next(server_state, selected_clients):
preprocessed_data = tff.federated_map(preprocessed_and_shuffled.dataset_computation, selected_clients)
return trainer.next(server_state, preprocessed_data)
manual_trainer_with_preprocessing = tff.templates.IterativeProcess(initialize_fn=trainer.initialize, next_fn=new_next)
למעשה, זה למעשה מה שעוזר שהשתמשנו בו עושה מתחת למכסה המנוע (ובנוסף ביצוע בדיקת סוג מתאים ומניפולציה). יכולנו אפילו הביעו את אותה לוגיקה מעט שונה, על ידי בהמשכים preprocess_and_shuffle
לתוך tff.Computation
, וכן לפירוק federated_map
לתוך צעד אחד אשר בונה מערכי נתונים-מעובדים האו"ם ועוד המפעילה preprocess_and_shuffle
על כל לקוח.
אנו יכולים לוודא שהנתיב היותר ידני זה מביא לחישובים עם חתימת סוג זהה לזה של המסייע של TFF (שמות פרמטרים מודולו):
print(trainer_accepting_ids.next.type_signature)
print(manual_trainer_with_preprocessing.next.type_signature)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,federated_dataset={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>) (<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,selected_clients={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)