Создание полностью пользовательских компонентов

В этом руководстве описывается, как использовать TFX API для создания полностью настраиваемого компонента. Полностью настраиваемые компоненты позволяют создавать компоненты, определяя классы спецификации компонента, исполнителя и интерфейса компонента. Такой подход позволяет повторно использовать и расширять стандартный компонент в соответствии с вашими потребностями.

Если вы новичок в конвейерах TFX, узнайте больше об основных концепциях конвейеров TFX .

Пользовательский исполнитель или пользовательский компонент

Если требуется только пользовательская логика обработки, а входные, выходные данные и свойства выполнения компонента такие же, как у существующего компонента, достаточно специального исполнителя. Полностью настраиваемый компонент необходим, когда какие-либо входные, выходные данные или свойства выполнения отличаются от любых существующих компонентов TFX.

Как создать собственный компонент?

Для разработки полностью пользовательского компонента требуется:

  • Определенный набор спецификаций входных и выходных артефактов для нового компонента. В частности, типы входных артефактов должны соответствовать типам выходных артефактов компонентов, которые создают артефакты, а типы выходных артефактов должны соответствовать типам входных артефактов компонентов, которые используют артефакты, если таковые имеются.
  • Параметры выполнения без артефактов, необходимые для нового компонента.

Спецификация компонента

Класс ComponentSpec определяет контракт компонента, определяя входные и выходные артефакты компонента, а также параметры, которые используются для выполнения компонента. Он состоит из трех частей:

  • ВХОДЫ : словарь типизированных параметров для входных артефактов, которые передаются исполнителю компонента. Обычно входные артефакты представляют собой выходные данные вышестоящих компонентов и, следовательно, имеют один и тот же тип.
  • ВЫХОДЫ : словарь типизированных параметров для выходных артефактов, которые создает компонент.
  • ПАРАМЕТРЫ : словарь дополнительных элементов ExecutionParameter , которые будут переданы исполнителю компонента. Это параметры, не являющиеся артефактами, которые мы хотим гибко определить в конвейере DSL и передать в выполнение.

Вот пример ComponentSpec:

class HelloComponentSpec(types.ComponentSpec):
  """ComponentSpec for Custom TFX Hello World Component."""

  PARAMETERS = {
      # These are parameters that will be passed in the call to
      # create an instance of this component.
      'name': ExecutionParameter(type=Text),
  }
  INPUTS = {
      # This will be a dictionary with input artifacts, including URIs
      'input_data': ChannelParameter(type=standard_artifacts.Examples),
  }
  OUTPUTS = {
      # This will be a dictionary which this component will populate
      'output_data': ChannelParameter(type=standard_artifacts.Examples),
  }

Исполнитель

Затем напишите код исполнителя для нового компонента. По сути, необходимо создать новый подкласс base_executor.BaseExecutor с переопределенной функцией Do В функции Do аргументы input_dict , output_dict и exec_properties , которые передаются в сопоставлении с INPUTS , OUTPUTS и PARAMETERS , которые определены в ComponentSpec соответственно. Для exec_properties значение можно получить непосредственно посредством поиска в словаре. Для артефактов в input_dict и output_dict есть удобные функции, доступные в классе артефакта_utils , которые можно использовать для извлечения экземпляра артефакта или URI артефакта.

class Executor(base_executor.BaseExecutor):
  """Executor for HelloComponent."""

  def Do(self, input_dict: Dict[Text, List[types.Artifact]],
         output_dict: Dict[Text, List[types.Artifact]],
         exec_properties: Dict[Text, Any]) -> None:
    ...

    split_to_instance = {}
    for artifact in input_dict['input_data']:
      for split in json.loads(artifact.split_names):
        uri = artifact_utils.get_split_uri([artifact], split)
        split_to_instance[split] = uri

    for split, instance in split_to_instance.items():
      input_dir = instance
      output_dir = artifact_utils.get_split_uri(
          output_dict['output_data'], split)
      for filename in tf.io.gfile.listdir(input_dir):
        input_uri = os.path.join(input_dir, filename)
        output_uri = os.path.join(output_dir, filename)
        io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)

Модульное тестирование пользовательского исполнителя

Модульные тесты для пользовательского исполнителя можно создавать аналогично этому .

Интерфейс компонента

Теперь, когда самая сложная часть завершена, следующим шагом будет сборка этих частей в интерфейс компонента, чтобы можно было использовать компонент в конвейере. Есть несколько шагов:

  • Сделайте интерфейс компонента подклассом base_component.BaseComponent
  • Назначьте переменную класса SPEC_CLASS с классом ComponentSpec , который был определен ранее.
  • Назначьте переменную класса EXECUTOR_SPEC с классом Executor, который был определен ранее.
  • Определите функцию-конструктор __init__() используя аргументы функции для создания экземпляра класса ComponentSpec и вызовите суперфункцию с этим значением вместе с необязательным именем.

При создании экземпляра компонента будет вызвана логика проверки типа в классе base_component.BaseComponent , чтобы убедиться, что переданные аргументы совместимы с информацией о типе, определенной в классе ComponentSpec .

from tfx.types import standard_artifacts
from hello_component import executor

class HelloComponent(base_component.BaseComponent):
  """Custom TFX Hello World Component."""

  SPEC_CLASS = HelloComponentSpec
  EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)

  def __init__(self,
               input_data: types.Channel = None,
               output_data: types.Channel = None,
               name: Optional[Text] = None):
    if not output_data:
      examples_artifact = standard_artifacts.Examples()
      examples_artifact.split_names = input_data.get()[0].split_names
      output_data = channel_utils.as_channel([examples_artifact])

    spec = HelloComponentSpec(input_data=input_data,
                              output_data=output_data, name=name)
    super(HelloComponent, self).__init__(spec=spec)

Сборка в конвейер TFX

Последний шаг — подключить новый пользовательский компонент к конвейеру TFX. Помимо добавления экземпляра нового компонента, также необходимо следующее:

  • Правильно подключите к нему вышестоящие и нисходящие компоненты нового компонента. Это делается путем ссылки на выходные данные вышестоящего компонента в новом компоненте и ссылки на выходные данные нового компонента в последующих компонентах.
  • Добавьте новый экземпляр компонента в список компонентов при построении конвейера.

В приведенном ниже примере показаны вышеупомянутые изменения. Полный пример можно найти в репозитории TFX на GitHub .

def _create_pipeline():
  ...
  example_gen = CsvExampleGen(input_base=examples)
  hello = component.HelloComponent(
      input_data=example_gen.outputs['examples'], name='HelloWorld')
  statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
  ...
  return pipeline.Pipeline(
      ...
      components=[example_gen, hello, statistics_gen, ...],
      ...
  )

Развертывание полностью настраиваемого компонента

Помимо изменений кода, все вновь добавленные части ( ComponentSpec , Executor , интерфейс компонента) должны быть доступны в среде выполнения конвейера, чтобы конвейер работал правильно.