Creación de componentes totalmente personalizados

Esta guía describe cómo utilizar la API de TFX para crear un componente totalmente personalizado. Los componentes completamente personalizados le permiten crear componentes definiendo la especificación del componente, el ejecutor y las clases de interfaz del componente. Este enfoque le permite reutilizar y ampliar un componente estándar para que se ajuste a sus necesidades.

Si es nuevo en las canalizaciones de TFX, obtenga más información sobre los conceptos básicos de las canalizaciones de TFX .

Ejecutor personalizado o componente personalizado

Si solo se necesita una lógica de procesamiento personalizada mientras que las entradas, las salidas y las propiedades de ejecución del componente son las mismas que las de un componente existente, un ejecutor personalizado es suficiente. Se necesita un componente completamente personalizado cuando cualquiera de las entradas, salidas o propiedades de ejecución es diferente de cualquier componente TFX existente.

¿Cómo crear un componente personalizado?

El desarrollo de un componente totalmente personalizado requiere:

  • Un conjunto definido de especificaciones de artefactos de entrada y salida para el nuevo componente. Especialmente, los tipos de artefactos de entrada deben ser consistentes con los tipos de artefactos de salida de los componentes que producen los artefactos y los tipos de artefactos de salida deben ser consistentes con los tipos de artefactos de entrada de los componentes que consumen los artefactos, si los hay.
  • Los parámetros de ejecución que no son artefactos que se necesitan para el nuevo componente.

Especificaciones del componente

La clase ComponentSpec define el contrato del componente definiendo los artefactos de entrada y salida de un componente, así como los parámetros que se utilizan para la ejecución del componente. Tiene tres partes:

  • ENTRADAS : un diccionario de parámetros escritos para los artefactos de entrada que se pasan al ejecutor del componente. Normalmente, los artefactos de entrada son las salidas de los componentes ascendentes y, por lo tanto, comparten el mismo tipo.
  • SALIDAS : un diccionario de parámetros escritos para los artefactos de salida que produce el componente.
  • PARÁMETROS : un diccionario de elementos ExecutionParameter adicionales que se pasarán al ejecutor del componente. Estos son parámetros que no son artefactos que queremos definir de manera flexible en el DSL de canalización y pasar a la ejecución.

Aquí hay un ejemplo de ComponentSpec:

class HelloComponentSpec(types.ComponentSpec):
  """ComponentSpec for Custom TFX Hello World Component."""

  PARAMETERS = {
      # These are parameters that will be passed in the call to
      # create an instance of this component.
      'name': ExecutionParameter(type=Text),
  }
  INPUTS = {
      # This will be a dictionary with input artifacts, including URIs
      'input_data': ChannelParameter(type=standard_artifacts.Examples),
  }
  OUTPUTS = {
      # This will be a dictionary which this component will populate
      'output_data': ChannelParameter(type=standard_artifacts.Examples),
  }

Ejecutor

A continuación, escriba el código ejecutor del nuevo componente. Básicamente, se debe crear una nueva subclase de base_executor.BaseExecutor con su función Do anulada. En la función Do , los argumentos input_dict , output_dict y exec_properties que se pasan en mapa a INPUTS , OUTPUTS y PARAMETERS que se definen en ComponentSpec respectivamente. Para exec_properties , el valor se puede obtener directamente a través de una búsqueda en el diccionario. Para artefactos en input_dict y output_dict , hay funciones convenientes disponibles en la clase artefacto_utils que se pueden usar para obtener una instancia de artefacto o un uri de artefacto.

class Executor(base_executor.BaseExecutor):
  """Executor for HelloComponent."""

  def Do(self, input_dict: Dict[Text, List[types.Artifact]],
         output_dict: Dict[Text, List[types.Artifact]],
         exec_properties: Dict[Text, Any]) -> None:
    ...

    split_to_instance = {}
    for artifact in input_dict['input_data']:
      for split in json.loads(artifact.split_names):
        uri = artifact_utils.get_split_uri([artifact], split)
        split_to_instance[split] = uri

    for split, instance in split_to_instance.items():
      input_dir = instance
      output_dir = artifact_utils.get_split_uri(
          output_dict['output_data'], split)
      for filename in tf.io.gfile.listdir(input_dir):
        input_uri = os.path.join(input_dir, filename)
        output_uri = os.path.join(output_dir, filename)
        io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)

Unidad de prueba de un ejecutor personalizado

Se pueden crear pruebas unitarias para el ejecutor personalizado de forma similar a esta .

Interfaz de componentes

Ahora que la parte más compleja está completa, el siguiente paso es ensamblar estas piezas en una interfaz de componente, para permitir que el componente se use en una canalización. Hay varios pasos:

  • Haga que la interfaz del componente sea una subclase de base_component.BaseComponent
  • Asigne una variable de clase SPEC_CLASS con la clase ComponentSpec que se definió anteriormente
  • Asigne una variable de clase EXECUTOR_SPEC con la clase Executor que se definió anteriormente
  • Defina la función constructora __init__() usando los argumentos de la función para construir una instancia de la clase ComponentSpec e invoque la superfunción con ese valor, junto con un nombre opcional

Cuando se crea una instancia del componente, se invocará la lógica de verificación de tipo en la clase base_component.BaseComponent para garantizar que los argumentos que se pasaron sean compatibles con la información de tipo definida en la clase ComponentSpec .

from tfx.types import standard_artifacts
from hello_component import executor

class HelloComponent(base_component.BaseComponent):
  """Custom TFX Hello World Component."""

  SPEC_CLASS = HelloComponentSpec
  EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)

  def __init__(self,
               input_data: types.Channel = None,
               output_data: types.Channel = None,
               name: Optional[Text] = None):
    if not output_data:
      examples_artifact = standard_artifacts.Examples()
      examples_artifact.split_names = input_data.get()[0].split_names
      output_data = channel_utils.as_channel([examples_artifact])

    spec = HelloComponentSpec(input_data=input_data,
                              output_data=output_data, name=name)
    super(HelloComponent, self).__init__(spec=spec)

Ensamble en una canalización TFX

El último paso es conectar el nuevo componente personalizado a una canalización TFX. Además de agregar una instancia del nuevo componente, también se necesita lo siguiente:

  • Cablee correctamente los componentes aguas arriba y aguas abajo del nuevo componente. Esto se hace haciendo referencia a las salidas del componente ascendente en el nuevo componente y haciendo referencia a las salidas del nuevo componente en los componentes descendentes.
  • Agregue la nueva instancia de componente a la lista de componentes al construir la canalización.

El siguiente ejemplo destaca los cambios antes mencionados. El ejemplo completo se puede encontrar en el repositorio de TFX GitHub .

def _create_pipeline():
  ...
  example_gen = CsvExampleGen(input_base=examples)
  hello = component.HelloComponent(
      input_data=example_gen.outputs['examples'], name='HelloWorld')
  statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
  ...
  return pipeline.Pipeline(
      ...
      components=[example_gen, hello, statistics_gen, ...],
      ...
  )

Implementar un componente totalmente personalizado

Además de los cambios de código, todas las partes recién agregadas ( ComponentSpec , Executor , interfaz de componentes) deben ser accesibles en el entorno de ejecución de la canalización para ejecutar la canalización correctamente.