TensorFlow.org पर देखें | Google Colab में चलाएं | GitHub पर स्रोत देखें | नोटबुक डाउनलोड करें |
इस ट्यूटोरियल में, हम पीछे डिजाइन सिद्धांतों की व्याख्या tff.aggregators
मॉड्यूल और सर्वर के लिए ग्राहकों से मूल्यों की कस्टम एकत्रीकरण को लागू करने के लिए सर्वोत्तम प्रथाओं।
पूर्वापेक्षाएँ। इस ट्यूटोरियल मान लिया गया है आप पहले से ही की बुनियादी अवधारणाओं से परिचित हैं संघीय कोर ऐसे प्लेसमेंट (के रूप में tff.SERVER
, tff.CLIENTS
), कैसे TFF संगणना का प्रतिनिधित्व करता है ( tff.tf_computation
, tff.federated_computation
) और उनके प्रकार हस्ताक्षर।
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
डिजाइन सारांश
TFF में, "एकत्रीकरण" पर मानों का एक सेट के आंदोलन को संदर्भित करता है tff.CLIENTS
पर एक ही प्रकार के कुल मूल्य का उत्पादन करने के tff.SERVER
। यही है, प्रत्येक व्यक्तिगत ग्राहक मूल्य उपलब्ध नहीं होना चाहिए। उदाहरण के लिए फ़ेडरेटेड लर्निंग में, क्लाइंट मॉडल अपडेट को सर्वर पर ग्लोबल मॉडल पर लागू करने के लिए एक समग्र मॉडल अपडेट प्राप्त करने के लिए औसत किया जाता है।
इस तरह के रूप में इस लक्ष्य को पूरा करने ऑपरेटरों के अलावा tff.federated_sum
, TFF प्रदान करता है tff.templates.AggregationProcess
(एक स्टेटफुल प्रक्रिया ) जो एकत्रीकरण गणना के लिए प्रकार हस्ताक्षर formalizes तो यह एक सरल योग से अधिक जटिल रूपों को सामान्यीकरण कर सकते हैं।
के मुख्य घटकों tff.aggregators
मॉड्यूल के निर्माण के लिए कारखाने हैं AggregationProcess
है, जो दो पहलुओं में TFF के आम तौर पर उपयोगी और बदले बिल्डिंग ब्लॉक होने के लिए तैयार कर रहे हैं:
- पैरामीटरयुक्त संगणनाएं। एकत्रीकरण एक स्वतंत्र निर्माण खंड है कि साथ काम करने के लिए डिज़ाइन अन्य TFF मॉड्यूल में जोड़ा जा सकता है
tff.aggregators
उनके आवश्यक एकत्रीकरण parameterize करने के लिए।
उदाहरण:
learning_process = tff.learning.build_federated_averaging_process(
...,
model_update_aggregation_factory=tff.aggregators.MeanFactory())
- एकत्रीकरण रचना। अधिक जटिल समग्र एकत्रीकरण बनाने के लिए अन्य एकत्रीकरण बिल्डिंग ब्लॉकों के साथ एक एकत्रीकरण बिल्डिंग ब्लॉक बनाया जा सकता है।
उदाहरण:
secure_mean = tff.aggregators.MeanFactory(
value_sum_factory=tff.aggregators.SecureSumFactory(...))
इस ट्यूटोरियल के बाकी हिस्सों में बताया गया है कि इन दो लक्ष्यों को कैसे प्राप्त किया जाता है।
एकत्रीकरण प्रक्रिया
हम पहले संक्षेप में प्रस्तुत tff.templates.AggregationProcess
, और इसके निर्माण के लिए कारखाना पैटर्न के साथ पालन करें।
tff.templates.AggregationProcess
एक है tff.templates.MeasuredProcess
एकत्रीकरण के लिए निर्दिष्ट प्रकार हस्ताक्षरों के साथ। विशेष रूप से, initialize
और next
कार्यों निम्न प्रकार के हस्ताक्षर होते हैं:
-
( -> state_type@SERVER)
-
(<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)
(प्रकार के राज्य state_type
) सर्वर पर रखा जाना चाहिए। next
समारोह इनपुट तर्क राज्य और एक मूल्य के एकत्रित होने की (प्रकार के रूप में लेता है value_type
) ग्राहकों पर रखा। *
साधन एक भारित मतलब में उदाहरण के वजन के लिए अन्य इनपुट तर्क वैकल्पिक,। यह एक अद्यतन स्थिति वस्तु, सर्वर पर रखे गए समान प्रकार का समग्र मान और कुछ माप देता है।
ध्यान दें कि दोनों राज्य के लिए सजा के बीच भेजा जाता next
समारोह, और सूचना माप की एक विशिष्ट निष्पादन के आधार पर किसी भी जानकारी को रिपोर्ट करने के लिए next
समारोह, खाली हो सकता है। फिर भी, टीएफएफ के अन्य हिस्सों के लिए स्पष्ट अनुबंध का पालन करने के लिए उन्हें स्पष्ट रूप से निर्दिष्ट किया जाना चाहिए।
अन्य TFF मॉड्यूल, उदाहरण में मॉडल अद्यतन के लिए tff.learning
, उपयोग करने के लिए उम्मीद कर रहे हैं tff.templates.AggregationProcess
parameterize करने के लिए कैसे मान एकत्रित कर रहे हैं। हालाँकि, वास्तव में एकत्रित मूल्य क्या हैं और उनके प्रकार के हस्ताक्षर क्या हैं, यह प्रशिक्षित किए जा रहे मॉडल के अन्य विवरणों और इसे करने के लिए उपयोग किए जाने वाले शिक्षण एल्गोरिदम पर निर्भर करता है।
संगणना के अन्य पहलुओं के एकत्रीकरण स्वतंत्र बनाने के लिए हम कारखाने पैटर्न का उपयोग - हम उपयुक्त बनाने tff.templates.AggregationProcess
एक बार वस्तुओं की प्रासंगिक प्रकार हस्ताक्षर एकत्रित होने की उपलब्ध हैं, लागू द्वारा create
कारखाने की विधि। इस प्रकार केवल पुस्तकालय लेखकों के लिए एकत्रीकरण प्रक्रिया के प्रत्यक्ष संचालन की आवश्यकता है, जो इस निर्माण के लिए जिम्मेदार हैं।
एकत्रीकरण प्रक्रिया कारखाने
भारित और भारित एकत्रीकरण के लिए दो सार आधार कारखाने वर्ग हैं। उनके create
विधि मान का प्रकार हस्ताक्षर लेता एकत्रित होने की और एक रिटर्न tff.templates.AggregationProcess
ऐसे मूल्यों के एकत्रीकरण के लिए।
के द्वारा बनाई गई प्रक्रिया tff.aggregators.UnweightedAggregationFactory
(1) सर्वर पर राज्य और (2) निर्दिष्ट प्रकार के मूल्य: दो इनपुट तर्क लेता value_type
।
एक उदाहरण दिया गया है tff.aggregators.SumFactory
।
के द्वारा बनाई गई प्रक्रिया tff.aggregators.WeightedAggregationFactory
(1) सर्वर पर राज्य, (2) निर्दिष्ट प्रकार के मूल्य: तीन इनपुट तर्क लेता value_type
और (3) प्रकार का वजन weight_type
जब इसकी लागू के रूप में कारखाने के उपयोगकर्ता द्वारा निर्दिष्ट, create
विधि।
एक उदाहरण दिया गया है tff.aggregators.MeanFactory
जो एक भारित मतलब गणना करता है।
फैक्ट्री पैटर्न यह है कि हम ऊपर बताए गए पहले लक्ष्य को कैसे प्राप्त करते हैं; वह एकत्रीकरण एक स्वतंत्र बिल्डिंग ब्लॉक है। उदाहरण के लिए, जब बदलते हुए मॉडल वेरिएबल को प्रशिक्षित किया जा सकता है, तो एक जटिल एकत्रीकरण को बदलने की आवश्यकता नहीं होती है; जब इस तरह के रूप में एक विधि द्वारा उपयोग यह प्रतिनिधित्व करने कारखाना एक अलग प्रकार का हस्ताक्षर द्वारा सक्रिय किया जाएगा tff.learning.build_federated_averaging_process
।
रचनाएं
याद रखें कि एक सामान्य एकत्रीकरण प्रक्रिया (ए) क्लाइंट पर मूल्यों के कुछ प्रीप्रोसेसिंग, (बी) क्लाइंट से सर्वर तक मूल्यों की आवाजाही, और (सी) सर्वर पर समेकित मूल्य के कुछ पोस्टप्रोसेसिंग को समाहित कर सकती है। दूसरे के ऊपर, एकत्रीकरण रचना घोषित लक्ष्य, अंदर महसूस किया है tff.aggregators
संरचना द्वारा मॉड्यूल एकत्रीकरण कारखानों ऐसे के कार्यान्वयन उस भाग (ख) एक और एकत्रीकरण कारखाने के लिए प्रत्यायोजित किया जा सकता।
एकल फैक्ट्री वर्ग के भीतर सभी आवश्यक तर्कों को लागू करने के बजाय, कार्यान्वयन डिफ़ॉल्ट रूप से एकत्रीकरण के लिए प्रासंगिक एकल पहलू पर केंद्रित होते हैं। जरूरत पड़ने पर, यह पैटर्न हमें बिल्डिंग ब्लॉक्स को एक-एक करके बदलने में सक्षम बनाता है।
एक उदाहरण भारित है tff.aggregators.MeanFactory
। इसका कार्यान्वयन क्लाइंट पर प्रदान किए गए मूल्यों और भारों को गुणा करता है, फिर भारित मूल्यों और भार दोनों को स्वतंत्र रूप से जोड़ता है, और फिर भारित मानों के योग को सर्वर पर भार के योग से विभाजित करता है। सीधे का उपयोग करके summations को लागू करने के बजाय tff.federated_sum
ऑपरेटर, योग के दो उदाहरणों को सौंपा जाता है tff.aggregators.SumFactory
।
इस तरह की संरचना दो डिफ़ॉल्ट योगों को अलग-अलग कारखानों द्वारा प्रतिस्थापित करना संभव बनाती है, जो अलग-अलग योग का एहसास करते हैं। उदाहरण के लिए, एक tff.aggregators.SecureSumFactory
, या के एक कस्टम कार्यान्वयन tff.aggregators.UnweightedAggregationFactory
। इसके विपरीत, समय, tff.aggregators.MeanFactory
ही एक और कारखाने के एक आंतरिक एकत्रीकरण जैसे हो सकता है tff.aggregators.clipping_factory
, अगर मान औसत से पहले काटा गया हो रहे हैं।
पिछले देखें ट्यूनिंग सीखने के लिए एकत्रित की सिफारिश में मौजूदा कारखानों का उपयोग कर रचना तंत्र की receommended उपयोगों के लिए ट्यूटोरियल tff.aggregators
मॉड्यूल।
उदाहरण के द्वारा सर्वोत्तम अभ्यास
हम वर्णन करने के लिए जा रहे हैं tff.aggregators
एक साधारण उदाहरण के कार्य को लागू करने से विस्तार से अवधारणाओं, और यह उत्तरोत्तर अधिक सामान्य बनाना। सीखने का दूसरा तरीका मौजूदा कारखानों के कार्यान्वयन को देखना है।
import collections
import tensorflow as tf
import tensorflow_federated as tff
इसके बजाय संक्षेप का value
, उदाहरण के कार्य योग करने के लिए है value * 2.0
और उसके बाद से राशि विभाजित 2.0
। एकत्रीकरण परिणाम इस प्रकार गणितीय सीधे संक्षेप के बराबर है value
, और तीन भागों से मिलकर रूप में सोचा जा सकता है: (1) ग्राहकों पर स्केलिंग (2) ग्राहकों (3) सर्वर पर unscaling भर में संक्षेप।
डिजाइन ऊपर बताया गया है के बाद, तर्क का एक उपवर्ग के रूप में लागू किया जाएगा tff.aggregators.UnweightedAggregationFactory
है, जो उचित बनाता tff.templates.AggregationProcess
जब किसी दिए गए value_type
कुल करने के लिए:
न्यूनतम कार्यान्वयन
उदाहरण कार्य के लिए, आवश्यक गणना हमेशा समान होती है, इसलिए राज्य का उपयोग करने की कोई आवश्यकता नहीं है। यह इस प्रकार रिक्त होता है, और के रूप में प्रस्तुत किया जाता है tff.federated_value((), tff.SERVER)
। माप के लिए भी यही है, अभी के लिए।
कार्य का न्यूनतम कार्यान्वयन इस प्रकार है:
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value((), tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
scaled_value = tff.federated_map(
tff.tf_computation(lambda x: x * 2.0), value)
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x: x / 2.0), summed_value)
measurements = tff.federated_value((), tff.SERVER)
return tff.templates.MeasuredProcessOutput(
state=state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
सब कुछ अपेक्षित रूप से काम करता है या नहीं, इसे निम्नलिखित कोड से सत्यापित किया जा सकता है:
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result} (expected 8.0)')
Type signatures of the created aggregation process: - initialize: ( -> <>@SERVER) - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>) Aggregation result: 8.0 (expected 8.0)
स्टेटफुलनेस और माप
टीएफएफ में व्यापक रूप से स्टेटफुलनेस का उपयोग उन गणनाओं का प्रतिनिधित्व करने के लिए किया जाता है जिन्हें पुनरावृत्त रूप से निष्पादित करने और प्रत्येक पुनरावृत्ति के साथ बदलने की उम्मीद है। उदाहरण के लिए, सीखने की गणना की स्थिति में सीखे जा रहे मॉडल का भार होता है।
यह समझाने के लिए कि एकत्रीकरण गणना में राज्य का उपयोग कैसे किया जाता है, हम उदाहरण कार्य को संशोधित करते हैं। इसके बजाय गुणा करने का value
द्वारा 2.0
, हम गुणा यह यात्रा सूचकांक द्वारा - समय की संख्या एकत्रीकरण मार डाला गया है।
ऐसा करने के लिए, हमें पुनरावृत्ति सूचकांक का ट्रैक रखने का एक तरीका चाहिए, जो राज्य की अवधारणा के माध्यम से प्राप्त किया जाता है। में initialize_fn
, बजाय एक खाली राज्य बनाने के लिए, हम राज्य को प्रारंभ एक अदिश शून्य होने के लिए। फिर, राज्य में इस्तेमाल किया जा सकता next_fn
(1) द्वारा वेतन वृद्धि: तीन चरणों में 1.0
, (2) गुणा करने के लिए उपयोग value
, और (3) नई जानकारी वाल राज्य के रूप में वापसी।
यह हो जाने पर, आप नोट कर सकते हैं: लेकिन वास्तव में इसके बाद के संस्करण के रूप में ही कोड अपेक्षित ढंग से सभी कार्यों को सत्यापित करने के लिए इस्तेमाल किया जा सकता है। मुझे कैसे पता चलेगा कि वास्तव में कुछ बदल गया है?
अच्छा प्रश्न! यहीं पर मापन की अवधारणा उपयोगी हो जाती है। सामान्य तौर पर, माप की एक एकल निष्पादन के लिए प्रासंगिक किसी भी मूल्य रिपोर्ट कर सकते हैं next
समारोह है, जो की निगरानी के लिए इस्तेमाल किया जा सकता। इस मामले में, यह हो सकता है summed_value
पिछले उदाहरण से। यही है, "अनस्केलिंग" चरण से पहले का मान, जो पुनरावृत्ति सूचकांक पर निर्भर होना चाहिए। फिर, यह व्यवहार में आवश्यक रूप से उपयोगी नहीं है, लेकिन प्रासंगिक तंत्र को दर्शाता है।
कार्य का राज्यव्यापी उत्तर इस प्रकार दिखता है:
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(
tff.tf_computation(lambda x: x + 1.0), state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
ध्यान दें कि state
कि में आता है next_fn
इनपुट के रूप में सर्वर पर रखा गया है। ये ग्राहकों पर इसका इस्तेमाल करने में, यह पहले सूचित किया जाना करने के लिए जो का उपयोग कर हासिल की है जरूरत tff.federated_broadcast
ऑपरेटर।
उम्मीद के रूप में सभी काम करता है सत्यापित करने के लिए, अब हम सूचना देख सकते हैं measurements
, है, जो निष्पादन के प्रत्येक दौर के साथ अलग होना चाहिए, भले ही एक ही साथ रन client_data
।
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 1)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 2)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 3)')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>) | Round #1 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 8.0 (expected 8.0 * 1) | Round #2 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 16.0 (expected 8.0 * 2) | Round #3 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 24.0 (expected 8.0 * 3)
संरचित प्रकार
फ़ेडरेटेड लर्निंग में प्रशिक्षित मॉडल के मॉडल वेट को आमतौर पर एक टेंसर के बजाय टेंसर के संग्रह के रूप में दर्शाया जाता है। TFF में, यह के रूप में प्रस्तुत किया जाता है tff.StructType
और आम तौर पर उपयोगी एकत्रीकरण कारखानों संरचित प्रकार स्वीकार करने में सक्षम होने की जरूरत है।
हालांकि, ऊपर के उदाहरण में, हम केवल एक साथ काम किया tff.TensorType
वस्तु। हम एक साथ एकत्रीकरण प्रक्रिया बनाने के लिए पिछले कारखाने का उपयोग करने का प्रयास करें tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))])
, हम एक अजीब त्रुटि क्योंकि मिल TensorFlow एक गुणा करने की कोशिश करेंगे tf.Tensor
और एक list
।
समस्या यह है कि बजाय एक निरंतर द्वारा tensors की संरचना गुणा करने का, हम एक निरंतर द्वारा संरचना में प्रत्येक टेन्सर गुणा करने की आवश्यकता है। इस समस्या के लिए हमेशा की तरह समाधान का उपयोग करने के लिए है tf.nest
बनाया मॉड्यूल के अंदर tff.tf_computation
रों।
पिछले संस्करण की ExampleTaskFactory
इस प्रकार के रूप में संरचित प्रकार के साथ संगत इस प्रकार है:
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(add_one, state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(scale, (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
यह उदाहरण एक पैटर्न पर प्रकाश डालता है जो TFF कोड की संरचना करते समय अनुसरण करने के लिए उपयोगी हो सकता है। बहुत ही सरल संचालन के साथ काम नहीं करते हैं, तो कोड अधिक स्पष्ट हो जाता है जब tff.tf_computation
रों है कि एक के अंदर ब्लॉक के निर्माण के रूप में इस्तेमाल किया जाएगा tff.federated_computation
एक अलग जगह में बनाया जाता है। के अंदर tff.federated_computation
, इन बिल्डिंग ब्लॉक्स केवल आंतरिक ऑपरेटरों का उपयोग जुड़े हुए हैं।
यह सत्यापित करने के लिए कि यह अपेक्षा के अनुरूप काम करता है:
client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
[[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
f' Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>) Aggregation result: [[2. 3.], [6. 4. 0.]] Expected: [[2. 3.], [6. 4. 0.]]
आंतरिक एकत्रीकरण
अंतिम चरण वैकल्पिक रूप से अन्य कारखानों को वास्तविक एकत्रीकरण के प्रतिनिधिमंडल को सक्षम करना है, ताकि विभिन्न एकत्रीकरण तकनीकों की आसान संरचना की अनुमति मिल सके।
यह एक वैकल्पिक बनाने के द्वारा हासिल की है inner_factory
हमारे के निर्माता में तर्क ExampleTaskFactory
। यदि निर्दिष्ट नहीं, tff.aggregators.SumFactory
प्रयोग किया जाता है, जो लागू होता है tff.federated_sum
पिछले अनुभाग में सीधे इस्तेमाल किया ऑपरेटर।
जब create
कहा जाता है, हम पहले कॉल कर सकते हैं create
के inner_factory
ही साथ आंतरिक एकत्रीकरण प्रक्रिया बनाने के लिए value_type
।
हमारी प्रक्रिया द्वारा वापस की स्थिति initialize_fn
राज्य द्वारा "इस" प्रक्रिया बनाया है, और अभी बनाया आंतरिक प्रक्रिया के राज्य: दो भागों की एक रचना है।
के कार्यान्वयन next_fn
कि वास्तविक एकत्रीकरण में अलग है को सौंपा जाता है next
आंतरिक प्रक्रिया के समारोह, और कैसे अंतिम आउटपुट बना है में। राज्य फिर "इस" और "आंतरिक" राज्य से बना है, और माप एक के रूप में एक समान तरीके से बना रहे हैं OrderedDict
।
निम्नलिखित इस तरह के पैटर्न का कार्यान्वयन है।
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def __init__(self, inner_factory=None):
if inner_factory is None:
inner_factory = tff.aggregators.SumFactory()
self._inner_factory = inner_factory
def create(self, value_type):
inner_process = self._inner_factory.create(value_type)
@tff.federated_computation()
def initialize_fn():
my_state = tff.federated_value(0.0, tff.SERVER)
inner_state = inner_process.initialize()
return tff.federated_zip((my_state, inner_state))
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
my_state, inner_state = state
my_new_state = tff.federated_map(add_one, my_state)
my_state_at_clients = tff.federated_broadcast(my_new_state)
scaled_value = tff.federated_map(scale, (value, my_state_at_clients))
# Delegation to an inner factory, returning values placed at SERVER.
inner_output = inner_process.next(inner_state, scaled_value)
unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))
new_state = tff.federated_zip((my_new_state, inner_output.state))
measurements = tff.federated_zip(
collections.OrderedDict(
scaled_value=inner_output.result,
example_task=inner_output.measurements))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
जब करने के लिए सौंपने inner_process.next
समारोह, वापसी संरचना पर हम पाते हैं एक है tff.templates.MeasuredProcessOutput
- एक ही तीन क्षेत्रों के साथ, state
, result
और measurements
। जब बना एकत्रीकरण प्रक्रिया की समग्र वापसी संरचना का निर्माण, state
और measurements
क्षेत्रों आम तौर पर बना है और एक साथ वापस आ जाना चाहिए। इसके विपरीत, result
मूल्य के लिए क्षेत्र मेल खाती एकत्रित किया जा रहा है और बना एकत्रीकरण बजाय "के माध्यम से बहती"।
state
वस्तु कारखाने के एक कार्यान्वयन विस्तार के रूप में देखा जाना चाहिए, और इस तरह रचना किसी भी संरचना के हो सकता है। हालांकि, measurements
पत्र व्यवहार करने के मूल्यों कुछ बिंदु पर उपयोगकर्ता को सूचित किया जाना है। इसलिए, हम उपयोग करने की सलाह देते OrderedDict
बना इस तरह के नामकरण कि यह जहां एक रचना में एक मीट्रिक सूचना से आता है स्पष्ट हो जाएगा के साथ,।
यह भी ध्यान रखें के उपयोग tff.federated_zip
ऑपरेटर। state
वस्तु बनाया प्रक्रिया द्वारा contolled एक होना चाहिए tff.FederatedType
। हम बजाय लौटा था तो (this_state, inner_state)
में initialize_fn
, अपनी वापसी प्रकार हस्ताक्षर एक होगा tff.StructType
की एक 2-टपल युक्त tff.FederatedType
रों। के उपयोग tff.federated_zip
"लिफ्टों" tff.FederatedType
शीर्ष स्तर पर। यह इसी तरह से प्रयोग किया जाता है next_fn
जब तैयारी कर राज्य और माप लौटा दी है।
अंत में, हम देख सकते हैं कि इसका उपयोग डिफ़ॉल्ट आंतरिक एकत्रीकरण के साथ कैसे किया जा सकता है:
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: () | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: ()
... और एक अलग आंतरिक एकत्रीकरण के साथ। उदाहरण के लिए, एक ExampleTaskFactory
:
client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())]) | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])
सारांश
इस ट्यूटोरियल में, हमने एक सामान्य-उद्देश्यीय एग्रीगेशन बिल्डिंग ब्लॉक बनाने के लिए पालन करने के लिए सर्वोत्तम अभ्यासों के बारे में बताया, जिसे एग्रीगेशन फैक्ट्री के रूप में दर्शाया गया है। व्यापकता दो तरह से डिजाइन के इरादे से आती है:
- पैरामीटरयुक्त संगणनाएं। एकत्रीकरण एक स्वतंत्र निर्माण खंड है कि अन्य TFF के साथ काम करने के लिए डिजाइन मॉड्यूल में जोड़ा जा सकता है
tff.aggregators
जैसे उनके आवश्यक एकत्रीकरण parameterize करने के लिए,tff.learning.build_federated_averaging_process
। - एकत्रीकरण रचना। अधिक जटिल समग्र एकत्रीकरण बनाने के लिए अन्य एकत्रीकरण बिल्डिंग ब्लॉकों के साथ एक एकत्रीकरण बिल्डिंग ब्लॉक बनाया जा सकता है।