ดูบน TensorFlow.org | ทำงานใน Google Colab | ดูแหล่งที่มาบน GitHub | ดาวน์โหลดโน๊ตบุ๊ค |
แนวคิดของชุดข้อมูลที่คีย์โดยไคลเอ็นต์ (เช่น ผู้ใช้) มีความสำคัญต่อการคำนวณแบบรวมศูนย์ตามแบบจำลองใน TFF ฉิบหายให้ติดต่อ tff.simulation.datasets.ClientData
ที่เป็นนามธรรมมากกว่าแนวคิดนี้และชุดข้อมูลที่โฮสต์ฉิบหาย ( StackOverflow , เช็คสเปียร์ , emnist , cifar100 และ gldv2 ) ทั้งหมดใช้อินเตอร์เฟซนี้
ถ้าคุณกำลังทำงานกับการเรียนรู้แบบ federated กับชุดของคุณเองฉิบหายขอสนับสนุนให้คุณสามารถดำเนินการอย่างใดอย่างหนึ่ง ClientData
อินเตอร์เฟซหรือการใช้หนึ่งฉิบหายของฟังก์ชั่นผู้ช่วยในการสร้าง ClientData
ซึ่งหมายถึงข้อมูลของคุณบนดิสก์เช่น tff.simulation.datasets.ClientData.from_clients_and_fn
.
เป็นที่สุดของฉิบหายของตัวอย่างแบบ end-to-end ที่เริ่มต้นด้วย ClientData
วัตถุ, การดำเนินการ ClientData
อินเตอร์เฟซที่มีชุดข้อมูลที่กำหนดเองของคุณจะทำให้มันง่ายต่อการ spelunk ผ่านรหัสที่มีอยู่เขียนด้วยฉิบหาย นอกจากนี้ tf.data.Datasets
ซึ่ง ClientData
โครงสร้างสามารถซ้ำมากกว่าโดยตรงเพื่อให้โครงสร้างของ numpy
อาร์เรย์ดังนั้น ClientData
วัตถุสามารถนำมาใช้กับกรอบ ML หลามตามก่อนที่จะย้ายไปฉิบหาย
มีหลายรูปแบบที่คุณสามารถทำให้ชีวิตของคุณง่ายขึ้นหากคุณต้องการขยายการจำลองของคุณไปยังหลายเครื่องหรือปรับใช้ ด้านล่างเราจะเดินผ่านไม่กี่วิธีที่เราสามารถใช้ ClientData
และฉิบหายที่จะทำให้ขนาดเล็กย้ำการขนาดใหญ่การทดลองการผลิตประสบการณ์การใช้งานของเราเป็นไปอย่างราบรื่นที่สุดเท่าที่ทำได้
ฉันควรใช้รูปแบบใดในการส่งผ่าน ClientData ไปยัง TFF
เราจะหารือสองประเพณีของฉิบหายของ ClientData
ในเชิงลึก; หากคุณเหมาะสมกับประเภทใดประเภทหนึ่งจากสองหมวดหมู่ด้านล่างนี้ คุณจะเลือกประเภทใดประเภทหนึ่งมากกว่าประเภทอื่นอย่างชัดเจน หากไม่เป็นเช่นนั้น คุณอาจต้องทำความเข้าใจรายละเอียดข้อดีข้อเสียของแต่ละรายการให้ละเอียดยิ่งขึ้นเพื่อตัดสินใจเลือกให้เหมาะสมยิ่งขึ้น
ฉันต้องการวนซ้ำโดยเร็วที่สุดบนเครื่องท้องถิ่น ฉันไม่จำเป็นต้องใช้ประโยชน์จากรันไทม์แบบกระจายของ TFF ได้ง่ายๆ
- คุณต้องการที่จะผ่าน
tf.data.Datasets
ในฉิบหายโดยตรง - นี้จะช่วยให้คุณสามารถตั้งโปรแกรม imperatively กับ
tf.data.Dataset
วัตถุและดำเนินการให้โดยพลการ - ให้ความยืดหยุ่นมากกว่าตัวเลือกด้านล่าง การผลักตรรกะไปยังไคลเอนต์ต้องการให้ตรรกะนี้เป็นอนุกรมได้
- คุณต้องการที่จะผ่าน
ฉันต้องการเรียกใช้การคำนวณแบบรวมศูนย์ในรันไทม์ระยะไกลของ TFF หรือฉันวางแผนที่จะดำเนินการในเร็วๆ นี้
- ในกรณีนี้ คุณต้องการแมปการสร้างชุดข้อมูลและการประมวลผลล่วงหน้ากับไคลเอ็นต์
- ผลนี้ในตัวคุณผ่านเพียงรายการ
client_ids
โดยตรงกับการคำนวณรวมของคุณ - การผลักดันการสร้างชุดข้อมูลและการประมวลผลล่วงหน้าไปยังไคลเอนต์จะช่วยหลีกเลี่ยงปัญหาคอขวดในการทำให้เป็นอนุกรม และเพิ่มประสิทธิภาพอย่างมากกับลูกค้าหลายแสนราย
ตั้งค่าสภาพแวดล้อมโอเพ่นซอร์ส
# tensorflow_federated_nightly also bring in tf_nightly, which
# can causes a duplicate tensorboard install, leading to errors.
!pip uninstall --yes tensorboard tb-nightly
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
นำเข้าแพ็คเกจ
import collections
import time
import tensorflow as tf
import tensorflow_federated as tff
การจัดการวัตถุ ClientData
ขอเริ่มต้นด้วยการโหลดและการสำรวจฉิบหายของ EMNIST ClientData
:
client_data, _ = tff.simulation.datasets.emnist.load_data()
Downloading emnist_all.sqlite.lzma: 100%|██████████| 170507172/170507172 [00:19<00:00, 8831921.67it/s] 2021-10-01 11:17:58.718735: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
ตรวจสอบชุดแรกที่สามารถบอกเราว่าสิ่งที่ประเภทของตัวอย่างอยู่ใน ClientData
first_client_id = client_data.client_ids[0]
first_client_dataset = client_data.create_tf_dataset_for_client(
first_client_id)
print(first_client_dataset.element_spec)
# This information is also available as a `ClientData` property:
assert client_data.element_type_structure == first_client_dataset.element_spec
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])
โปรดทราบว่าชุดข้อมูลที่อัตราผลตอบแทน collections.OrderedDict
วัตถุที่มี pixels
และ label
กุญแจที่พิกเซลเป็นเมตริกซ์ที่มีรูปร่าง [28, 28]
สมมติว่าเราต้องการที่จะแผ่ปัจจัยการผลิตของเราออกมาให้มีรูปทรง [784]
วิธีการหนึ่งที่เป็นไปได้ที่เราจะสามารถทำเช่นนี้จะนำไปใช้ฟังก์ชั่นการประมวลผลก่อนที่จะเรา ClientData
วัตถุ
def preprocess_dataset(dataset):
"""Create batches of 5 examples, and limit to 3 batches."""
def map_fn(input):
return collections.OrderedDict(
x=tf.reshape(input['pixels'], shape=(-1, 784)),
y=tf.cast(tf.reshape(input['label'], shape=(-1, 1)), tf.int64),
)
return dataset.batch(5).map(
map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE).take(5)
preprocessed_client_data = client_data.preprocess(preprocess_dataset)
# Notice that we have both reshaped and renamed the elements of the ordered dict.
first_client_dataset = preprocessed_client_data.create_tf_dataset_for_client(
first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])
เราอาจต้องการดำเนินการล่วงหน้าที่ซับซ้อนมากขึ้น (และอาจเป็นการเก็บสถานะ) เพิ่มเติม เช่น การสับเปลี่ยน
def preprocess_and_shuffle(dataset):
"""Applies `preprocess_dataset` above and shuffles the result."""
preprocessed = preprocess_dataset(dataset)
return preprocessed.shuffle(buffer_size=5)
preprocessed_and_shuffled = client_data.preprocess(preprocess_and_shuffle)
# The type signature will remain the same, but the batches will be shuffled.
first_client_dataset = preprocessed_and_shuffled.create_tf_dataset_for_client(
first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])
เชื่อมต่อกับ tff.Computation
ตอนนี้ที่เราสามารถดำเนินกิจวัตรพื้นฐานบางอย่างกับ ClientData
วัตถุเราพร้อมที่จะข้อมูลฟีดกับ tff.Computation
เรากำหนด tff.templates.IterativeProcess
ซึ่งดำเนิน สหพันธ์ Averaging และสำรวจวิธีการที่แตกต่างกันของการส่งผ่านข้อมูลที่มัน
def model_fn():
model = tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(784,)),
tf.keras.layers.Dense(10, kernel_initializer='zeros'),
])
return tff.learning.from_keras_model(
model,
# Note: input spec is the _batched_ shape, and includes the
# label tensor which will be passed to the loss function. This model is
# therefore configured to accept data _after_ it has been preprocessed.
input_spec=collections.OrderedDict(
x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
y=tf.TensorSpec(shape=[None, 1], dtype=tf.int64)),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
trainer = tff.learning.build_federated_averaging_process(
model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.01))
ก่อนที่เราจะเริ่มต้นการทำงานกับเรื่องนี้ IterativeProcess
หนึ่งความคิดเห็นเกี่ยวกับความหมายของ ClientData
อยู่ในลำดับ ClientData
วัตถุแสดงให้เห็นถึงความสมบูรณ์ของประชากรที่มีอยู่สำหรับการฝึกอบรมแบบ federated ซึ่งโดยทั่วไปจะ ไม่สามารถใช้ได้กับสภาพแวดล้อมการดำเนินการของการผลิตระบบฟลอริด้า และเป็นเฉพาะกับการจำลอง ClientData
แน่นอนช่วยให้ผู้ใช้สามารถที่จะข้ามคอมพิวเตอร์ federated สิ้นเชิงและก็ฝึกรูปแบบที่ฝั่งเซิร์ฟเวอร์ตามปกติผ่าน ClientData.create_tf_dataset_from_all_clients
สภาพแวดล้อมการจำลองของ TFF ทำให้ผู้วิจัยสามารถควบคุมวงรอบนอกได้อย่างสมบูรณ์ โดยเฉพาะอย่างยิ่ง นี่แสดงถึงการพิจารณาความพร้อมใช้งานของไคลเอ็นต์ การออกจากระบบของไคลเอ็นต์ ฯลฯ จะต้องได้รับการแก้ไขโดยผู้ใช้หรือสคริปต์ไดรเวอร์ Python หนึ่งสามารถสำหรับการออกกลางคันของลูกค้าตัวอย่างเช่นรูปแบบโดยการปรับการกระจายการสุ่มตัวอย่างของคุณมากกว่า ClientData's
client_ids
ดังกล่าวว่าผู้ใช้ที่มีข้อมูลมากขึ้น (และตามลําดับอีกต่อไปทำงานคำนวณท้องถิ่น) จะได้รับการคัดเลือกด้วยความน่าจะต่ำกว่า
อย่างไรก็ตาม ในระบบรวมจริง ไคลเอ็นต์ไม่สามารถเลือกโดยผู้ฝึกสอนแบบจำลองได้อย่างชัดเจน การเลือกไคลเอนต์จะมอบหมายให้กับระบบที่กำลังดำเนินการคำนวณแบบรวมศูนย์
ผ่าน tf.data.Datasets
โดยตรงกับฉิบหาย
ทางเลือกหนึ่งที่เรามีสำหรับการเชื่อมต่อระหว่าง ClientData
และ IterativeProcess
คือการสร้าง tf.data.Datasets
ในหลามและผ่านชุดข้อมูลเหล่านี้เพื่อฉิบหาย
ขอให้สังเกตว่าถ้าเราใช้ preprocessed เรา ClientData
ชุดข้อมูลที่เราให้ผลผลิตเป็นประเภทที่เหมาะสมโดยคาดว่ารูปแบบของเราที่กำหนดไว้ข้างต้น
selected_client_ids = preprocessed_and_shuffled.client_ids[:10]
preprocessed_data_for_clients = [
preprocessed_and_shuffled.create_tf_dataset_for_client(
selected_client_ids[i]) for i in range(10)
]
state = trainer.initialize()
for _ in range(5):
t1 = time.time()
state, metrics = trainer.next(state, preprocessed_data_for_clients)
t2 = time.time()
print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.compat.v1.graph_util.extract_sub_graph` WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.compat.v1.graph_util.extract_sub_graph` loss 2.9005744457244873, round time 4.576513767242432 loss 3.113278388977051, round time 0.49641919136047363 loss 2.7581865787506104, round time 0.4904160499572754 loss 2.87259578704834, round time 0.48976993560791016 loss 3.1202380657196045, round time 0.6724586486816406
ถ้าเราใช้เส้นทางนี้ แต่เราจะไม่สามารถที่จะย้ายไปนิดจำลอง multimachine ชุดข้อมูลที่เราสร้างในรันไทม์ TensorFlow ท้องถิ่นสามารถจับภาพรัฐจากสภาพแวดล้อมหลามโดยรอบและล้มเหลวในการอนุกรมหรือ deserialization เมื่อพวกเขาพยายามที่จะอ้างอิงรัฐซึ่งไม่สามารถใช้ได้กับพวกเขา นี้สามารถประจักษ์เช่นในข้อผิดพลาดลึกลับจาก TensorFlow ของ tensor_util.cc
:
Check failed: DT_VARIANT == input.dtype() (21 vs. 20)
การสร้างแผนที่และการประมวลผลล่วงหน้าเหนือลูกค้า
เพื่อหลีกเลี่ยงปัญหานี้ฉิบหายแนะนำให้ผู้ใช้ในการพิจารณา instantiation ชุดและ preprocessing เป็นสิ่งที่เกิดขึ้นในประเทศที่ลูกค้าแต่ละคนและจะใช้ผู้ช่วยเหลือฉิบหายหรือ federated_map
ใช้รหัส preprocessing นี้อย่างชัดเจนที่ลูกค้าแต่ละราย
ตามแนวคิดแล้ว เหตุผลในการเลือกสิ่งนี้ชัดเจน: ในรันไทม์ท้องถิ่นของ TFF ลูกค้า "โดยบังเอิญ" เท่านั้นที่สามารถเข้าถึงสภาพแวดล้อม Python ทั่วโลกได้ เนื่องจากข้อเท็จจริงที่ว่าการประสานรวมทั้งหมดเกิดขึ้นในเครื่องเดียว เป็นที่น่าสังเกตว่า ณ จุดนี้ความคิดที่คล้ายคลึงกันก่อให้เกิดปรัชญาการทำงานข้ามแพลตฟอร์มของ TFF ที่เรียงลำดับได้เสมอและใช้งานได้
ฉิบหายทำให้การเปลี่ยนแปลงดังกล่าวโดยง่ายผ่านทาง ClientData's
แอตทริบิวต์ dataset_computation
เป็น tff.Computation
ซึ่งจะนำ client_id
และผลตอบแทนที่เกี่ยวข้อง tf.data.Dataset
โปรดทราบว่า preprocess
ก็ทำงานกับ dataset_computation
; dataset_computation
แอตทริบิวต์ของ preprocessed ClientData
ประกอบด้วยท่อ preprocessing ทั้งเราเพียงแค่กำหนด:
print('dataset computation without preprocessing:')
print(client_data.dataset_computation.type_signature)
print('\n')
print('dataset computation with preprocessing:')
print(preprocessed_and_shuffled.dataset_computation.type_signature)
dataset computation without preprocessing: (string -> <label=int32,pixels=float32[28,28]>*) dataset computation with preprocessing: (string -> <x=float32[?,784],y=int64[?,1]>*)
เราสามารถเรียก dataset_computation
และได้รับชุดข้อมูลที่กระตือรือร้นในรันไทม์หลาม แต่อำนาจที่แท้จริงของวิธีการนี้จะใช้สิทธิเมื่อเราประกอบกับกระบวนการซ้ำหรือการคำนวณเพื่อหลีกเลี่ยงการ materializing ชุดข้อมูลเหล่านี้ในรันไทม์กระตือรือร้นระดับโลกที่ทุกคนอีก ฉิบหายมีฟังก์ชั่นผู้ช่วย tff.simulation.compose_dataset_computation_with_iterative_process
ซึ่งสามารถใช้ในการทำตรงนี้
trainer_accepting_ids = tff.simulation.compose_dataset_computation_with_iterative_process(
preprocessed_and_shuffled.dataset_computation, trainer)
ทั้งสองนี้ tff.templates.IterativeProcesses
และเหนือวิ่งในทางเดียวกัน; แต่อดีตยอมรับชุดข้อมูลลูกค้า preprocessed และหลังยอมรับสตริงตัวแทนรหัสลูกค้า, การจัดการโครงการก่อสร้างของทั้งชุดและ preprocessing ในร่างกายของ - ในความเป็นจริง state
สามารถส่งผ่านระหว่างสอง
for _ in range(5):
t1 = time.time()
state, metrics = trainer_accepting_ids.next(state, selected_client_ids)
t2 = time.time()
print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
loss 2.8417396545410156, round time 1.6707067489624023 loss 2.7670371532440186, round time 0.5207102298736572 loss 2.665048122406006, round time 0.5302855968475342 loss 2.7213189601898193, round time 0.5313887596130371 loss 2.580148935317993, round time 0.5283482074737549
ขยายสู่ลูกค้าจำนวนมาก
trainer_accepting_ids
ทันทีสามารถนำมาใช้ในการรันไทม์ multimachine ฉิบหายของและหลีกเลี่ยงการรายอื่น tf.data.Datasets
และการควบคุม (และดังนั้นจึง serializing พวกเขาและส่งพวกเขาออกไปคนงาน)
สิ่งนี้ช่วยเร่งความเร็วการจำลองแบบกระจาย โดยเฉพาะอย่างยิ่งกับไคลเอนต์จำนวนมาก และเปิดใช้งานการรวมระดับกลางเพื่อหลีกเลี่ยงค่าใช้จ่ายการทำให้เป็นอนุกรม/ดีซีเรียลไลซ์เซชันที่คล้ายคลึงกัน
ทางเลือกเชิงลึก: การเขียนตรรกะก่อนการประมวลผลใน TFF . ด้วยตนเอง
TFF ได้รับการออกแบบมาเพื่อการจัดองค์ประกอบตั้งแต่ต้นจนจบ ประเภทขององค์ประกอบที่เพิ่งดำเนินการโดยผู้ช่วยของ TFF นั้นอยู่ในการควบคุมของเราอย่างเต็มที่ในฐานะผู้ใช้ เราจะได้มีการคำนวณด้วยตนเองประกอบการ preprocessing เราก็กำหนดไว้กับครูฝึกของตัวเอง next
มากเพียง:
selected_clients_type = tff.FederatedType(preprocessed_and_shuffled.dataset_computation.type_signature.parameter, tff.CLIENTS)
@tff.federated_computation(trainer.next.type_signature.parameter[0], selected_clients_type)
def new_next(server_state, selected_clients):
preprocessed_data = tff.federated_map(preprocessed_and_shuffled.dataset_computation, selected_clients)
return trainer.next(server_state, preprocessed_data)
manual_trainer_with_preprocessing = tff.templates.IterativeProcess(initialize_fn=trainer.initialize, next_fn=new_next)
อันที่จริง นี่คือสิ่งที่ผู้ช่วยที่เราใช้ทำอย่างมีประสิทธิภาพภายใต้ประทุน (รวมถึงการตรวจสอบประเภทและการจัดการที่เหมาะสม) เรายังสามารถได้แสดงตรรกะเดียวกันแตกต่างกันเล็กน้อยโดย serializing preprocess_and_shuffle
เป็น tff.Computation
และย่อยสลาย federated_map
เข้าไปในขั้นตอนเดียวซึ่งสร้างชุดข้อมูลยกเลิก preprocessed และอื่น ๆ ซึ่งวิ่ง preprocess_and_shuffle
ที่ลูกค้าแต่ละราย
เราสามารถยืนยันได้ว่าเส้นทางแบบแมนนวลมากกว่านี้ส่งผลให้มีการคำนวณด้วยลายเซ็นประเภทเดียวกับตัวช่วยของ TFF (ชื่อพารามิเตอร์โมดูลาร์):
print(trainer_accepting_ids.next.type_signature)
print(manual_trainer_with_preprocessing.next.type_signature)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,federated_dataset={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>) (<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,selected_clients={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)