Посмотреть на TensorFlow.org | Запускаем в Google Colab | Посмотреть исходный код на GitHub | Скачать блокнот |
В этом уроке мы объясним принципы проектирования , лежащие в основе tff.aggregators
модуля и передовой опыт для реализации пользовательского агрегирования значений от клиентов к серверу.
Предпосылки. Предполагается , что вы уже знакомы с основными понятиями Федеративные Ядра , такие как места размещения ( tff.SERVER
, tff.CLIENTS
), как ПТФ представляет вычисления ( tff.tf_computation
, tff.federated_computation
) и их подписи типа.
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
Резюме дизайна
В TFF, «агрегация» относится к движению набора значений на tff.CLIENTS
, чтобы произвести совокупную стоимость одного и того же типа на tff.SERVER
. То есть не обязательно иметь ценность каждого отдельного клиента. Например, при федеративном обучении обновления клиентской модели усредняются, чтобы получить агрегированное обновление модели для применения к глобальной модели на сервере.
В дополнение к операторам достижения этой цели , такие как tff.federated_sum
, ПТФ обеспечивает tff.templates.AggregationProcess
(а с сохранением состояния процесса ) , который формализует тип подписи для агрегации вычислений , так что можно обобщить на более сложные формы , чем простая сумма.
Основные компоненты tff.aggregators
модуля являются заводами по созданию AggregationProcess
, которые предназначены , чтобы быть в целом полезными и сменные строительными блоками TFF в двух аспектах:
- Параметризованные вычисления. Агрегирование является самостоятельным строительным блоком , который может быть подключен к другим модулям ПТФА , предназначенных для работы с
tff.aggregators
спараметрировать их необходимую агрегацию.
Пример:
learning_process = tff.learning.build_federated_averaging_process(
...,
model_update_aggregation_factory=tff.aggregators.MeanFactory())
- Агрегационный состав. Строительный блок агрегирования может быть составлен с другими строительными блоками агрегирования для создания более сложных составных агрегатов.
Пример:
secure_mean = tff.aggregators.MeanFactory(
value_sum_factory=tff.aggregators.SecureSumFactory(...))
Остальная часть этого руководства объясняет, как достигаются эти две цели.
Процесс агрегирования
Мы первый итог tff.templates.AggregationProcess
, и следовать с рисунком фабрики для ее создания.
tff.templates.AggregationProcess
является tff.templates.MeasuredProcess
с подписями типа , указанные для агрегации. В частности, initialize
и next
функции имеют подписи типа:
-
( -> state_type@SERVER)
-
(<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)
Состояние (тип state_type
) должно быть размещено на сервере. next
функция принимает в качестве входного параметра состояния и значение , которое должно быть агрегированной (типа value_type
) размещены на клиентов. В *
означает опционно другие входные аргументы, например , весы в средневзвешенном. Он возвращает обновленный объект состояния, агрегированное значение того же типа, размещенное на сервере, и некоторые измерения.
Обратите внимание , что оба государства должны быть переданы между расстрелами next
функции, и сообщаемые измерения предназначены , чтобы сообщить какую - либо информацию в зависимости от выполнения конкретной части next
функции, может быть пустым. Тем не менее, они должны быть четко указаны, чтобы другие части TFF имели четкий контракт, которому нужно следовать.
Другие модули TFF, например , обновление модели в tff.learning
, как ожидается, использовать tff.templates.AggregationProcess
спараметрировать как значения суммируются. Однако то, что именно агрегируются значения и каковы их типовые сигнатуры, зависит от других деталей обучаемой модели и алгоритма обучения, используемого для этого.
Для агрегации независимо от других аспектов вычислений, мы используем шаблон фабрики - мы создаем соответствующий tff.templates.AggregationProcess
только соответствующие сигнатуры типов объектов , которые будут агрегатироваться доступны, активируя create
метод завода. Таким образом, непосредственное управление процессом агрегирования необходимо только авторам библиотеки, которые несут ответственность за это создание.
Заводы по переработке агрегации
Есть два абстрактных базовых фабричных класса для невзвешенного и взвешенного агрегирования. Их create
метод принимает подпись типа значения должны быть объединен и возвращает tff.templates.AggregationProcess
для агрегации таких значений.
Процесс созданный tff.aggregators.UnweightedAggregationFactory
принимает два входных аргумента: (1) состояние на сервере и (2) значение указанного типа value_type
.
Пример реализации является tff.aggregators.SumFactory
.
Процесс созданный tff.aggregators.WeightedAggregationFactory
принимает три входных аргумента: (1) состояние на сервере, (2) значение указанного типа value_type
и (3) вес типа weight_type
, как указано пользователем завода при вызове ее create
метода.
Пример реализации является tff.aggregators.MeanFactory
, который вычисляет взвешенное среднее.
Заводской шаблон - это то, как мы достигаем первой цели, указанной выше; эта агрегация является независимым строительным блоком. Например, при изменении обучаемых переменных модели сложная агрегация не обязательно должна изменяться; завод , представляющий его будет вызываться с другим типом подписи , когда используется таким способом, как tff.learning.build_federated_averaging_process
.
Композиции
Напомним, что общий процесс агрегации может инкапсулировать (а) некоторую предварительную обработку значений на клиентах, (б) перемещение значений от клиента к серверу и (в) некоторую постобработку агрегированного значения на сервере. Вторая цель указаны выше, композиции агрегации, реализуются внутри в tff.aggregators
модуля путем структурированием реализации агрегации заводов , таких , что часть (б) может быть передан на другую агрегацию фабрику.
Вместо того, чтобы реализовывать всю необходимую логику в рамках одного фабричного класса, реализации по умолчанию сосредоточены на одном аспекте, важном для агрегации. При необходимости этот шаблон затем позволяет нам заменять строительные блоки по одному.
Примером является взвешенной tff.aggregators.MeanFactory
. Его реализация умножает предоставленные значения и веса на клиентах, затем суммирует взвешенные значения и веса независимо, а затем делит сумму взвешенных значений на сумму весов на сервере. Вместо того , чтобы реализации сложений, непосредственно используя tff.federated_sum
оператора, суммирование делегированы двух экземпляров tff.aggregators.SumFactory
.
Такая структура позволяет заменять два суммирования по умолчанию разными фабриками, которые реализуют сумму по-разному. Например, tff.aggregators.SecureSumFactory
или Заказная реализация tff.aggregators.UnweightedAggregationFactory
. С другой стороны , время, tff.aggregators.MeanFactory
сам по себе может быть внутренней агрегация другого завода , такие как tff.aggregators.clipping_factory
, если значения должны быть обрезаны до усреднения.
Смотрите предыдущие настройки рекомендуются агрегирование для обучения учебника для receommended использования механизма композиции с использованием существующих заводов в tff.aggregators
модуле.
Лучшие практики на примере
Мы будем иллюстрировать tff.aggregators
концепции в деталях реализации простой пример задачи, и сделать его более и более общее. Еще один способ научиться - это посмотреть на реализацию существующих фабрик.
import collections
import tensorflow as tf
import tensorflow_federated as tff
Вместо суммирования value
, пример задача состоит в том, чтобы подвести value * 2.0
, а затем разделить сумму на 2.0
. Результат агрегации, таким образом , математически эквивалентна непосредственно суммирования value
, и можно было бы рассматривать как состоящую из трех частей: (1) масштабирование на клиентов (2) суммирование по клиентам (3) на сервере водяного камня.
После дизайн пояснялось выше, логика будет реализован как подкласс tff.aggregators.UnweightedAggregationFactory
, который создает соответствующий tff.templates.AggregationProcess
, когда данный value_type
в агрегат:
Минимальная реализация
Для задачи примера необходимые вычисления всегда одни и те же, поэтому нет необходимости использовать состояние. Это, таким образом , пустые, и представлен в виде tff.federated_value((), tff.SERVER)
. То же самое пока и с измерениями.
Таким образом, минимальная реализация задачи следующая:
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value((), tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
scaled_value = tff.federated_map(
tff.tf_computation(lambda x: x * 2.0), value)
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x: x / 2.0), summed_value)
measurements = tff.federated_value((), tff.SERVER)
return tff.templates.MeasuredProcessOutput(
state=state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
Все ли работает должным образом, можно проверить с помощью следующего кода:
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result} (expected 8.0)')
Type signatures of the created aggregation process: - initialize: ( -> <>@SERVER) - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>) Aggregation result: 8.0 (expected 8.0)
Состояние и измерения
Сохранение состояния широко используется в TFF для представления вычислений, которые, как ожидается, будут выполняться итеративно и изменяться с каждой итерацией. Например, состояние обучающего вычисления содержит веса обучаемой модели.
Чтобы проиллюстрировать, как использовать состояние в вычислении агрегирования, мы изменим пример задачи. Вместо умножения value
на 2.0
, мы умножаем его на индекс итерации - количество раз , агрегация была выполнена.
Для этого нам нужен способ отслеживать индекс итерации, что достигается за счет концепции состояния. В initialize_fn
, вместо создания пустого состояния, мы инициализировать состояние быть нулевым скаляр. Затем состояние может быть использовано в next_fn
в три этапа: (1) увеличивается на 1.0
, (2) используют для умножения value
, и (3) возвращают в качестве нового обновленного состояния.
Как только это будет сделано, вы можете заметить: Но точно такой же код , как описано выше , может быть использован для проверки всех работ , как ожидалось. Как я узнаю, что что-то действительно изменилось?
Хороший вопрос! Здесь становится полезной концепция измерений. В общем, измерения могут сообщать любые значения , имеющие отношение к одному исполнению next
функции, которая может быть использована для мониторинга. В этом случае, он может быть summed_value
из предыдущего примера. То есть значение до шага «масштабирования», которое должно зависеть от индекса итерации. Опять же, это не обязательно полезно на практике, но иллюстрирует соответствующий механизм.
Таким образом, ответ на задачу с сохранением состояния выглядит следующим образом:
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(
tff.tf_computation(lambda x: x + 1.0), state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
Следует отметить , что state
, которое приходит в next_fn
в качестве входных данных размещаются на сервере. Для того , чтобы использовать его на клиентах, он сначала должен быть сообщен, что достигается с помощью tff.federated_broadcast
оператора.
Для того, чтобы проверить , все работы , как и ожидалось, теперь мы можем взглянуть на сообщенных measurements
, которые должны быть различными с каждым раундом исполнения, даже если бежать с такой же client_data
.
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 1)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 2)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 3)')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>) | Round #1 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 8.0 (expected 8.0 * 1) | Round #2 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 16.0 (expected 8.0 * 2) | Round #3 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 24.0 (expected 8.0 * 3)
Структурированные типы
Веса модели для модели, обученной федеративному обучению, обычно представлены в виде набора тензоров, а не одного тензора. В TFF, это представляется как tff.StructType
и вообще полезные заводы агрегатных должны быть в состоянии принять структурированных типов.
Однако, в приведенных выше примерах, мы работали только с tff.TensorType
объекта. Если попытаться использовать предыдущую фабрику для создания процесса агрегации с tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))])
, мы получаем странную ошибку , потому что TensorFlow будет пытаться умножить tf.Tensor
и list
.
Проблема заключается в том, что вместо умножения структуры тензоров на константу, мы должны умножить каждый тензор в структуре константы. Обычное решение этой проблемы заключается в использовании tf.nest
модуля внутри созданного tff.tf_computation
с.
Версия предыдущего ExampleTaskFactory
совместимого с структурированными типами , таким образом , выглядит следующим образом :
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(add_one, state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(scale, (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
В этом примере подчеркивается шаблон, которому может быть полезно следовать при структурировании кода TFF. Когда дело не с очень простыми операциями, код становится более разборчивым , когда tff.tf_computation
s , которые будут использоваться в качестве строительных блоков внутри tff.federated_computation
создается в отдельном месте. Внутри tff.federated_computation
, эти строительные блоки подключаются только используя внутренние операторы.
Чтобы убедиться, что он работает должным образом:
client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
[[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
f' Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>) Aggregation result: [[2. 3.], [6. 4. 0.]] Expected: [[2. 3.], [6. 4. 0.]]
Внутренние скопления
Последний шаг - необязательно разрешить делегирование фактического агрегирования другим фабрикам, чтобы упростить составление различных методов агрегирования.
Это достигается за счет создания дополнительного inner_factory
аргумент в конструкторе нашего ExampleTaskFactory
. Если не указано, tff.aggregators.SumFactory
используется, который применяет tff.federated_sum
оператор , используемый непосредственно в предыдущем разделе.
Когда create
называется, мы можем назвать первым create
из inner_factory
создать внутренний процесс агрегации с тем же value_type
.
Состояние нашего процесса возвращенного initialize_fn
представляет собой композицию из двух частей: государства , созданных «этого» процесс, и состояния только что созданного внутреннего процесса.
Реализация next_fn
отличается тем , что фактической агрегации делегированы next
функции внутреннего процесса, так и в том , как составлен окончательный вывод. Состояние снова состоят из «этого» и «внутреннего» состояния, а измерения состоят в том же порядке , как OrderedDict
.
Ниже представлена реализация такого шаблона.
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def __init__(self, inner_factory=None):
if inner_factory is None:
inner_factory = tff.aggregators.SumFactory()
self._inner_factory = inner_factory
def create(self, value_type):
inner_process = self._inner_factory.create(value_type)
@tff.federated_computation()
def initialize_fn():
my_state = tff.federated_value(0.0, tff.SERVER)
inner_state = inner_process.initialize()
return tff.federated_zip((my_state, inner_state))
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
my_state, inner_state = state
my_new_state = tff.federated_map(add_one, my_state)
my_state_at_clients = tff.federated_broadcast(my_new_state)
scaled_value = tff.federated_map(scale, (value, my_state_at_clients))
# Delegation to an inner factory, returning values placed at SERVER.
inner_output = inner_process.next(inner_state, scaled_value)
unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))
new_state = tff.federated_zip((my_new_state, inner_output.state))
measurements = tff.federated_zip(
collections.OrderedDict(
scaled_value=inner_output.result,
example_task=inner_output.measurements))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
При делегировании в inner_process.next
функции, структура возврата мы получаем это tff.templates.MeasuredProcessOutput
, с теми же тремя полями - state
, result
и measurements
. При создании общей структуры возвращаемой в составе процесса агрегации, то state
и measurements
поле должно быть составлено , как правило , и вернулся вместе. В противоположность этому , result
поля соответствует значению , которые объединяются и вместо «течет через» в составе агрегации.
state
объекта следует рассматривать как деталь реализации завода, и , таким образом , композиция может быть любой структурой. Однако measurements
соответствуют значения , которые будут сообщать пользователю в какой - то момент. Поэтому мы рекомендуем использовать OrderedDict
с именованием , такими авторской , что было бы понятно , где в составе делает сообщила Метрика приходит.
Следует также отметить использование tff.federated_zip
оператора. state
объекта contolled с помощью созданного процесса должны быть tff.FederatedType
. Если бы мы вместо этого вернулся (this_state, inner_state)
в initialize_fn
, его возвращаемый тип подписи будет tff.StructType
, содержащий 2-кортеж tff.FederatedType
s. Использование tff.federated_zip
«лифты» в tff.FederatedType
до уровня верхнего. Это аналогично использует в next_fn
при подготовке состояние и измерения должны быть возвращены.
Наконец, мы можем увидеть, как это можно использовать с внутренней агрегацией по умолчанию:
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: () | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: ()
... и с другой внутренней агрегацией. Например, ExampleTaskFactory
:
client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())]) | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])
Резюме
В этом руководстве мы объяснили лучшие практики, которым нужно следовать, чтобы создать универсальный строительный блок агрегации, представленный как фабрика агрегации. Общность достигается за счет замысла дизайна двумя способами:
- Параметризованные вычисления. Агрегирование является самостоятельным строительным блоком , который может быть подключен к другим модулям ПТФА , предназначенных для работы с
tff.aggregators
спараметрировать их необходимую агрегацию, такие какtff.learning.build_federated_averaging_process
. - Агрегационный состав. Строительный блок агрегирования может быть составлен с другими строительными блоками агрегирования для создания более сложных составных агрегатов.