कस्टम एकत्रीकरण लागू करना

TensorFlow.org पर देखें Google Colab में चलाएं GitHub पर स्रोत देखें नोटबुक डाउनलोड करें

इस ट्यूटोरियल में, हम पीछे डिजाइन सिद्धांतों की व्याख्या tff.aggregators मॉड्यूल और सर्वर के लिए ग्राहकों से मूल्यों की कस्टम एकत्रीकरण को लागू करने के लिए सर्वोत्तम प्रथाओं।

पूर्वापेक्षाएँ। इस ट्यूटोरियल मान लिया गया है आप पहले से ही की बुनियादी अवधारणाओं से परिचित हैं संघीय कोर ऐसे प्लेसमेंट (के रूप में tff.SERVER , tff.CLIENTS ), कैसे TFF संगणना का प्रतिनिधित्व करता है ( tff.tf_computation , tff.federated_computation ) और उनके प्रकार हस्ताक्षर।

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

डिजाइन सारांश

TFF में, "एकत्रीकरण" पर मानों का एक सेट के आंदोलन को संदर्भित करता है tff.CLIENTS पर एक ही प्रकार के कुल मूल्य का उत्पादन करने के tff.SERVER । यही है, प्रत्येक व्यक्तिगत ग्राहक मूल्य उपलब्ध नहीं होना चाहिए। उदाहरण के लिए फ़ेडरेटेड लर्निंग में, क्लाइंट मॉडल अपडेट को सर्वर पर ग्लोबल मॉडल पर लागू करने के लिए एक समग्र मॉडल अपडेट प्राप्त करने के लिए औसत किया जाता है।

इस तरह के रूप में इस लक्ष्य को पूरा करने ऑपरेटरों के अलावा tff.federated_sum , TFF प्रदान करता है tff.templates.AggregationProcess (एक स्टेटफुल प्रक्रिया ) जो एकत्रीकरण गणना के लिए प्रकार हस्ताक्षर formalizes तो यह एक सरल योग से अधिक जटिल रूपों को सामान्यीकरण कर सकते हैं।

के मुख्य घटकों tff.aggregators मॉड्यूल के निर्माण के लिए कारखाने हैं AggregationProcess है, जो दो पहलुओं में TFF के आम तौर पर उपयोगी और बदले बिल्डिंग ब्लॉक होने के लिए तैयार कर रहे हैं:

  1. पैरामीटरयुक्त संगणनाएं। एकत्रीकरण एक स्वतंत्र निर्माण खंड है कि साथ काम करने के लिए डिज़ाइन अन्य TFF मॉड्यूल में जोड़ा जा सकता है tff.aggregators उनके आवश्यक एकत्रीकरण parameterize करने के लिए।

उदाहरण:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. एकत्रीकरण रचना। अधिक जटिल समग्र एकत्रीकरण बनाने के लिए अन्य एकत्रीकरण बिल्डिंग ब्लॉकों के साथ एक एकत्रीकरण बिल्डिंग ब्लॉक बनाया जा सकता है।

उदाहरण:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

इस ट्यूटोरियल के बाकी हिस्सों में बताया गया है कि इन दो लक्ष्यों को कैसे प्राप्त किया जाता है।

एकत्रीकरण प्रक्रिया

हम पहले संक्षेप में प्रस्तुत tff.templates.AggregationProcess , और इसके निर्माण के लिए कारखाना पैटर्न के साथ पालन करें।

tff.templates.AggregationProcess एक है tff.templates.MeasuredProcess एकत्रीकरण के लिए निर्दिष्ट प्रकार हस्ताक्षरों के साथ। विशेष रूप से, initialize और next कार्यों निम्न प्रकार के हस्ताक्षर होते हैं:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

(प्रकार के राज्य state_type ) सर्वर पर रखा जाना चाहिए। next समारोह इनपुट तर्क राज्य और एक मूल्य के एकत्रित होने की (प्रकार के रूप में लेता है value_type ) ग्राहकों पर रखा। * साधन एक भारित मतलब में उदाहरण के वजन के लिए अन्य इनपुट तर्क वैकल्पिक,। यह एक अद्यतन स्थिति वस्तु, सर्वर पर रखे गए समान प्रकार का समग्र मान और कुछ माप देता है।

ध्यान दें कि दोनों राज्य के लिए सजा के बीच भेजा जाता next समारोह, और सूचना माप की एक विशिष्ट निष्पादन के आधार पर किसी भी जानकारी को रिपोर्ट करने के लिए next समारोह, खाली हो सकता है। फिर भी, टीएफएफ के अन्य हिस्सों के लिए स्पष्ट अनुबंध का पालन करने के लिए उन्हें स्पष्ट रूप से निर्दिष्ट किया जाना चाहिए।

अन्य TFF मॉड्यूल, उदाहरण में मॉडल अद्यतन के लिए tff.learning , उपयोग करने के लिए उम्मीद कर रहे हैं tff.templates.AggregationProcess parameterize करने के लिए कैसे मान एकत्रित कर रहे हैं। हालाँकि, वास्तव में एकत्रित मूल्य क्या हैं और उनके प्रकार के हस्ताक्षर क्या हैं, यह प्रशिक्षित किए जा रहे मॉडल के अन्य विवरणों और इसे करने के लिए उपयोग किए जाने वाले शिक्षण एल्गोरिदम पर निर्भर करता है।

संगणना के अन्य पहलुओं के एकत्रीकरण स्वतंत्र बनाने के लिए हम कारखाने पैटर्न का उपयोग - हम उपयुक्त बनाने tff.templates.AggregationProcess एक बार वस्तुओं की प्रासंगिक प्रकार हस्ताक्षर एकत्रित होने की उपलब्ध हैं, लागू द्वारा create कारखाने की विधि। इस प्रकार केवल पुस्तकालय लेखकों के लिए एकत्रीकरण प्रक्रिया के प्रत्यक्ष संचालन की आवश्यकता है, जो इस निर्माण के लिए जिम्मेदार हैं।

एकत्रीकरण प्रक्रिया कारखाने

भारित और भारित एकत्रीकरण के लिए दो सार आधार कारखाने वर्ग हैं। उनके create विधि मान का प्रकार हस्ताक्षर लेता एकत्रित होने की और एक रिटर्न tff.templates.AggregationProcess ऐसे मूल्यों के एकत्रीकरण के लिए।

के द्वारा बनाई गई प्रक्रिया tff.aggregators.UnweightedAggregationFactory (1) सर्वर पर राज्य और (2) निर्दिष्ट प्रकार के मूल्य: दो इनपुट तर्क लेता value_type

एक उदाहरण दिया गया है tff.aggregators.SumFactory

के द्वारा बनाई गई प्रक्रिया tff.aggregators.WeightedAggregationFactory (1) सर्वर पर राज्य, (2) निर्दिष्ट प्रकार के मूल्य: तीन इनपुट तर्क लेता value_type और (3) प्रकार का वजन weight_type जब इसकी लागू के रूप में कारखाने के उपयोगकर्ता द्वारा निर्दिष्ट, create विधि।

एक उदाहरण दिया गया है tff.aggregators.MeanFactory जो एक भारित मतलब गणना करता है।

फैक्ट्री पैटर्न यह है कि हम ऊपर बताए गए पहले लक्ष्य को कैसे प्राप्त करते हैं; वह एकत्रीकरण एक स्वतंत्र बिल्डिंग ब्लॉक है। उदाहरण के लिए, जब बदलते हुए मॉडल वेरिएबल को प्रशिक्षित किया जा सकता है, तो एक जटिल एकत्रीकरण को बदलने की आवश्यकता नहीं होती है; जब इस तरह के रूप में एक विधि द्वारा उपयोग यह प्रतिनिधित्व करने कारखाना एक अलग प्रकार का हस्ताक्षर द्वारा सक्रिय किया जाएगा tff.learning.build_federated_averaging_process

रचनाएं

याद रखें कि एक सामान्य एकत्रीकरण प्रक्रिया (ए) क्लाइंट पर मूल्यों के कुछ प्रीप्रोसेसिंग, (बी) क्लाइंट से सर्वर तक मूल्यों की आवाजाही, और (सी) सर्वर पर समेकित मूल्य के कुछ पोस्टप्रोसेसिंग को समाहित कर सकती है। दूसरे के ऊपर, एकत्रीकरण रचना घोषित लक्ष्य, अंदर महसूस किया है tff.aggregators संरचना द्वारा मॉड्यूल एकत्रीकरण कारखानों ऐसे के कार्यान्वयन उस भाग (ख) एक और एकत्रीकरण कारखाने के लिए प्रत्यायोजित किया जा सकता।

एकल फैक्ट्री वर्ग के भीतर सभी आवश्यक तर्कों को लागू करने के बजाय, कार्यान्वयन डिफ़ॉल्ट रूप से एकत्रीकरण के लिए प्रासंगिक एकल पहलू पर केंद्रित होते हैं। जरूरत पड़ने पर, यह पैटर्न हमें बिल्डिंग ब्लॉक्स को एक-एक करके बदलने में सक्षम बनाता है।

एक उदाहरण भारित है tff.aggregators.MeanFactory । इसका कार्यान्वयन क्लाइंट पर प्रदान किए गए मूल्यों और भारों को गुणा करता है, फिर भारित मूल्यों और भार दोनों को स्वतंत्र रूप से जोड़ता है, और फिर भारित मानों के योग को सर्वर पर भार के योग से विभाजित करता है। सीधे का उपयोग करके summations को लागू करने के बजाय tff.federated_sum ऑपरेटर, योग के दो उदाहरणों को सौंपा जाता है tff.aggregators.SumFactory

इस तरह की संरचना दो डिफ़ॉल्ट योगों को अलग-अलग कारखानों द्वारा प्रतिस्थापित करना संभव बनाती है, जो अलग-अलग योग का एहसास करते हैं। उदाहरण के लिए, एक tff.aggregators.SecureSumFactory , या के एक कस्टम कार्यान्वयन tff.aggregators.UnweightedAggregationFactory । इसके विपरीत, समय, tff.aggregators.MeanFactory ही एक और कारखाने के एक आंतरिक एकत्रीकरण जैसे हो सकता है tff.aggregators.clipping_factory , अगर मान औसत से पहले काटा गया हो रहे हैं।

पिछले देखें ट्यूनिंग सीखने के लिए एकत्रित की सिफारिश में मौजूदा कारखानों का उपयोग कर रचना तंत्र की receommended उपयोगों के लिए ट्यूटोरियल tff.aggregators मॉड्यूल।

उदाहरण के द्वारा सर्वोत्तम अभ्यास

हम वर्णन करने के लिए जा रहे हैं tff.aggregators एक साधारण उदाहरण के कार्य को लागू करने से विस्तार से अवधारणाओं, और यह उत्तरोत्तर अधिक सामान्य बनाना। सीखने का दूसरा तरीका मौजूदा कारखानों के कार्यान्वयन को देखना है।

import collections
import tensorflow as tf
import tensorflow_federated as tff

इसके बजाय संक्षेप का value , उदाहरण के कार्य योग करने के लिए है value * 2.0 और उसके बाद से राशि विभाजित 2.0 । एकत्रीकरण परिणाम इस प्रकार गणितीय सीधे संक्षेप के बराबर है value , और तीन भागों से मिलकर रूप में सोचा जा सकता है: (1) ग्राहकों पर स्केलिंग (2) ग्राहकों (3) सर्वर पर unscaling भर में संक्षेप।

डिजाइन ऊपर बताया गया है के बाद, तर्क का एक उपवर्ग के रूप में लागू किया जाएगा tff.aggregators.UnweightedAggregationFactory है, जो उचित बनाता tff.templates.AggregationProcess जब किसी दिए गए value_type कुल करने के लिए:

न्यूनतम कार्यान्वयन

उदाहरण कार्य के लिए, आवश्यक गणना हमेशा समान होती है, इसलिए राज्य का उपयोग करने की कोई आवश्यकता नहीं है। यह इस प्रकार रिक्त होता है, और के रूप में प्रस्तुत किया जाता है tff.federated_value((), tff.SERVER) । माप के लिए भी यही है, अभी के लिए।

कार्य का न्यूनतम कार्यान्वयन इस प्रकार है:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

सब कुछ अपेक्षित रूप से काम करता है या नहीं, इसे निम्नलिखित कोड से सत्यापित किया जा सकता है:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

स्टेटफुलनेस और माप

टीएफएफ में व्यापक रूप से स्टेटफुलनेस का उपयोग उन गणनाओं का प्रतिनिधित्व करने के लिए किया जाता है जिन्हें पुनरावृत्त रूप से निष्पादित करने और प्रत्येक पुनरावृत्ति के साथ बदलने की उम्मीद है। उदाहरण के लिए, सीखने की गणना की स्थिति में सीखे जा रहे मॉडल का भार होता है।

यह समझाने के लिए कि एकत्रीकरण गणना में राज्य का उपयोग कैसे किया जाता है, हम उदाहरण कार्य को संशोधित करते हैं। इसके बजाय गुणा करने का value द्वारा 2.0 , हम गुणा यह यात्रा सूचकांक द्वारा - समय की संख्या एकत्रीकरण मार डाला गया है।

ऐसा करने के लिए, हमें पुनरावृत्ति सूचकांक का ट्रैक रखने का एक तरीका चाहिए, जो राज्य की अवधारणा के माध्यम से प्राप्त किया जाता है। में initialize_fn , बजाय एक खाली राज्य बनाने के लिए, हम राज्य को प्रारंभ एक अदिश शून्य होने के लिए। फिर, राज्य में इस्तेमाल किया जा सकता next_fn (1) द्वारा वेतन वृद्धि: तीन चरणों में 1.0 , (2) गुणा करने के लिए उपयोग value , और (3) नई जानकारी वाल राज्य के रूप में वापसी।

यह हो जाने पर, आप नोट कर सकते हैं: लेकिन वास्तव में इसके बाद के संस्करण के रूप में ही कोड अपेक्षित ढंग से सभी कार्यों को सत्यापित करने के लिए इस्तेमाल किया जा सकता है। मुझे कैसे पता चलेगा कि वास्तव में कुछ बदल गया है?

अच्छा प्रश्न! यहीं पर मापन की अवधारणा उपयोगी हो जाती है। सामान्य तौर पर, माप की एक एकल निष्पादन के लिए प्रासंगिक किसी भी मूल्य रिपोर्ट कर सकते हैं next समारोह है, जो की निगरानी के लिए इस्तेमाल किया जा सकता। इस मामले में, यह हो सकता है summed_value पिछले उदाहरण से। यही है, "अनस्केलिंग" चरण से पहले का मान, जो पुनरावृत्ति सूचकांक पर निर्भर होना चाहिए। फिर, यह व्यवहार में आवश्यक रूप से उपयोगी नहीं है, लेकिन प्रासंगिक तंत्र को दर्शाता है।

कार्य का राज्यव्यापी उत्तर इस प्रकार दिखता है:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

ध्यान दें कि state कि में आता है next_fn इनपुट के रूप में सर्वर पर रखा गया है। ये ग्राहकों पर इसका इस्तेमाल करने में, यह पहले सूचित किया जाना करने के लिए जो का उपयोग कर हासिल की है जरूरत tff.federated_broadcast ऑपरेटर।

उम्मीद के रूप में सभी काम करता है सत्यापित करने के लिए, अब हम सूचना देख सकते हैं measurements , है, जो निष्पादन के प्रत्येक दौर के साथ अलग होना चाहिए, भले ही एक ही साथ रन client_data

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

संरचित प्रकार

फ़ेडरेटेड लर्निंग में प्रशिक्षित मॉडल के मॉडल वेट को आमतौर पर एक टेंसर के बजाय टेंसर के संग्रह के रूप में दर्शाया जाता है। TFF में, यह के रूप में प्रस्तुत किया जाता है tff.StructType और आम तौर पर उपयोगी एकत्रीकरण कारखानों संरचित प्रकार स्वीकार करने में सक्षम होने की जरूरत है।

हालांकि, ऊपर के उदाहरण में, हम केवल एक साथ काम किया tff.TensorType वस्तु। हम एक साथ एकत्रीकरण प्रक्रिया बनाने के लिए पिछले कारखाने का उपयोग करने का प्रयास करें tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) , हम एक अजीब त्रुटि क्योंकि मिल TensorFlow एक गुणा करने की कोशिश करेंगे tf.Tensor और एक list

समस्या यह है कि बजाय एक निरंतर द्वारा tensors की संरचना गुणा करने का, हम एक निरंतर द्वारा संरचना में प्रत्येक टेन्सर गुणा करने की आवश्यकता है। इस समस्या के लिए हमेशा की तरह समाधान का उपयोग करने के लिए है tf.nest बनाया मॉड्यूल के अंदर tff.tf_computation रों।

पिछले संस्करण की ExampleTaskFactory इस प्रकार के रूप में संरचित प्रकार के साथ संगत इस प्रकार है:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

यह उदाहरण एक पैटर्न पर प्रकाश डालता है जो TFF कोड की संरचना करते समय अनुसरण करने के लिए उपयोगी हो सकता है। बहुत ही सरल संचालन के साथ काम नहीं करते हैं, तो कोड अधिक स्पष्ट हो जाता है जब tff.tf_computation रों है कि एक के अंदर ब्लॉक के निर्माण के रूप में इस्तेमाल किया जाएगा tff.federated_computation एक अलग जगह में बनाया जाता है। के अंदर tff.federated_computation , इन बिल्डिंग ब्लॉक्स केवल आंतरिक ऑपरेटरों का उपयोग जुड़े हुए हैं।

यह सत्यापित करने के लिए कि यह अपेक्षा के अनुरूप काम करता है:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

आंतरिक एकत्रीकरण

अंतिम चरण वैकल्पिक रूप से अन्य कारखानों को वास्तविक एकत्रीकरण के प्रतिनिधिमंडल को सक्षम करना है, ताकि विभिन्न एकत्रीकरण तकनीकों की आसान संरचना की अनुमति मिल सके।

यह एक वैकल्पिक बनाने के द्वारा हासिल की है inner_factory हमारे के निर्माता में तर्क ExampleTaskFactory । यदि निर्दिष्ट नहीं, tff.aggregators.SumFactory प्रयोग किया जाता है, जो लागू होता है tff.federated_sum पिछले अनुभाग में सीधे इस्तेमाल किया ऑपरेटर।

जब create कहा जाता है, हम पहले कॉल कर सकते हैं create के inner_factory ही साथ आंतरिक एकत्रीकरण प्रक्रिया बनाने के लिए value_type

हमारी प्रक्रिया द्वारा वापस की स्थिति initialize_fn राज्य द्वारा "इस" प्रक्रिया बनाया है, और अभी बनाया आंतरिक प्रक्रिया के राज्य: दो भागों की एक रचना है।

के कार्यान्वयन next_fn कि वास्तविक एकत्रीकरण में अलग है को सौंपा जाता है next आंतरिक प्रक्रिया के समारोह, और कैसे अंतिम आउटपुट बना है में। राज्य फिर "इस" और "आंतरिक" राज्य से बना है, और माप एक के रूप में एक समान तरीके से बना रहे हैं OrderedDict

निम्नलिखित इस तरह के पैटर्न का कार्यान्वयन है।

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

जब करने के लिए सौंपने inner_process.next समारोह, वापसी संरचना पर हम पाते हैं एक है tff.templates.MeasuredProcessOutput - एक ही तीन क्षेत्रों के साथ, state , result और measurements । जब बना एकत्रीकरण प्रक्रिया की समग्र वापसी संरचना का निर्माण, state और measurements क्षेत्रों आम तौर पर बना है और एक साथ वापस आ जाना चाहिए। इसके विपरीत, result मूल्य के लिए क्षेत्र मेल खाती एकत्रित किया जा रहा है और बना एकत्रीकरण बजाय "के माध्यम से बहती"।

state वस्तु कारखाने के एक कार्यान्वयन विस्तार के रूप में देखा जाना चाहिए, और इस तरह रचना किसी भी संरचना के हो सकता है। हालांकि, measurements पत्र व्यवहार करने के मूल्यों कुछ बिंदु पर उपयोगकर्ता को सूचित किया जाना है। इसलिए, हम उपयोग करने की सलाह देते OrderedDict बना इस तरह के नामकरण कि यह जहां एक रचना में एक मीट्रिक सूचना से आता है स्पष्ट हो जाएगा के साथ,।

यह भी ध्यान रखें के उपयोग tff.federated_zip ऑपरेटर। state वस्तु बनाया प्रक्रिया द्वारा contolled एक होना चाहिए tff.FederatedType । हम बजाय लौटा था तो (this_state, inner_state) में initialize_fn , अपनी वापसी प्रकार हस्ताक्षर एक होगा tff.StructType की एक 2-टपल युक्त tff.FederatedType रों। के उपयोग tff.federated_zip "लिफ्टों" tff.FederatedType शीर्ष स्तर पर। यह इसी तरह से प्रयोग किया जाता है next_fn जब तैयारी कर राज्य और माप लौटा दी है।

अंत में, हम देख सकते हैं कि इसका उपयोग डिफ़ॉल्ट आंतरिक एकत्रीकरण के साथ कैसे किया जा सकता है:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... और एक अलग आंतरिक एकत्रीकरण के साथ। उदाहरण के लिए, एक ExampleTaskFactory :

client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

सारांश

इस ट्यूटोरियल में, हमने एक सामान्य-उद्देश्यीय एग्रीगेशन बिल्डिंग ब्लॉक बनाने के लिए पालन करने के लिए सर्वोत्तम अभ्यासों के बारे में बताया, जिसे एग्रीगेशन फैक्ट्री के रूप में दर्शाया गया है। व्यापकता दो तरह से डिजाइन के इरादे से आती है:

  1. पैरामीटरयुक्त संगणनाएं। एकत्रीकरण एक स्वतंत्र निर्माण खंड है कि अन्य TFF के साथ काम करने के लिए डिजाइन मॉड्यूल में जोड़ा जा सकता है tff.aggregators जैसे उनके आवश्यक एकत्रीकरण parameterize करने के लिए, tff.learning.build_federated_averaging_process
  2. एकत्रीकरण रचना। अधिक जटिल समग्र एकत्रीकरण बनाने के लिए अन्य एकत्रीकरण बिल्डिंग ब्लॉकों के साथ एक एकत्रीकरण बिल्डिंग ब्लॉक बनाया जा सकता है।