Création de composants entièrement personnalisés

Ce guide décrit comment utiliser l'API TFX pour créer un composant entièrement personnalisé. Les composants entièrement personnalisés vous permettent de créer des composants en définissant la spécification du composant, l'exécuteur et les classes d'interface du composant. Cette approche vous permet de réutiliser et d'étendre un composant standard pour répondre à vos besoins.

Si vous débutez avec les pipelines TFX, découvrez les concepts fondamentaux des pipelines TFX .

Exécuteur personnalisé ou composant personnalisé

Si seule une logique de traitement personnalisée est nécessaire alors que les entrées, sorties et propriétés d'exécution du composant sont les mêmes que celles d'un composant existant, un exécuteur personnalisé est suffisant. Un composant entièrement personnalisé est nécessaire lorsque l’une des entrées, sorties ou propriétés d’exécution est différente des composants TFX existants.

Comment créer un composant personnalisé ?

Développer un composant entièrement personnalisé nécessite :

  • Un ensemble défini de spécifications d'artefacts d'entrée et de sortie pour le nouveau composant. En particulier, les types des artefacts d'entrée doivent être cohérents avec les types d'artefacts de sortie des composants qui produisent les artefacts et les types des artefacts de sortie doivent être cohérents avec les types d'artefacts d'entrée des composants qui consomment les artefacts, le cas échéant.
  • Paramètres d'exécution non-artefacts nécessaires au nouveau composant.

Spécification du composant

La classe ComponentSpec définit le contrat de composant en définissant les artefacts d'entrée et de sortie d'un composant ainsi que les paramètres utilisés pour l'exécution du composant. Il comporte trois parties :

  • INPUTS : un dictionnaire de paramètres typés pour les artefacts d'entrée qui sont transmis à l'exécuteur du composant. Normalement, les artefacts d'entrée sont les sorties des composants en amont et partagent donc le même type.
  • OUTPUTS : Un dictionnaire de paramètres typés pour les artefacts de sortie produits par le composant.
  • PARAMETERS : un dictionnaire d'éléments ExecutionParameter supplémentaires qui seront transmis à l'exécuteur du composant. Il s'agit de paramètres non artefacts que nous souhaitons définir de manière flexible dans le pipeline DSL et passer à l'exécution.

Voici un exemple de ComponentSpec :

class HelloComponentSpec(types.ComponentSpec):
  """ComponentSpec for Custom TFX Hello World Component."""

  PARAMETERS = {
      # These are parameters that will be passed in the call to
      # create an instance of this component.
      'name': ExecutionParameter(type=Text),
  }
  INPUTS = {
      # This will be a dictionary with input artifacts, including URIs
      'input_data': ChannelParameter(type=standard_artifacts.Examples),
  }
  OUTPUTS = {
      # This will be a dictionary which this component will populate
      'output_data': ChannelParameter(type=standard_artifacts.Examples),
  }

Exécuteur

Ensuite, écrivez le code exécuteur du nouveau composant. Fondamentalement, une nouvelle sous-classe de base_executor.BaseExecutor doit être créée avec sa fonction Do remplacée. Dans la fonction Do , les arguments input_dict , output_dict et exec_properties qui sont transmis sont mappés vers INPUTS , OUTPUTS et PARAMETERS qui sont définis respectivement dans ComponentSpec. Pour exec_properties , la valeur peut être récupérée directement via une recherche dans le dictionnaire. Pour les artefacts dans input_dict et output_dict , des fonctions pratiques sont disponibles dans la classeartefact_utils qui peuvent être utilisées pour récupérer une instance d'artefact ou un URI d'artefact.

class Executor(base_executor.BaseExecutor):
  """Executor for HelloComponent."""

  def Do(self, input_dict: Dict[Text, List[types.Artifact]],
         output_dict: Dict[Text, List[types.Artifact]],
         exec_properties: Dict[Text, Any]) -> None:
    ...

    split_to_instance = {}
    for artifact in input_dict['input_data']:
      for split in json.loads(artifact.split_names):
        uri = artifact_utils.get_split_uri([artifact], split)
        split_to_instance[split] = uri

    for split, instance in split_to_instance.items():
      input_dir = instance
      output_dir = artifact_utils.get_split_uri(
          output_dict['output_data'], split)
      for filename in tf.io.gfile.listdir(input_dir):
        input_uri = os.path.join(input_dir, filename)
        output_uri = os.path.join(output_dir, filename)
        io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)

Test unitaire d'un exécuteur personnalisé

Des tests unitaires pour l'exécuteur personnalisé peuvent être créés de manière similaire à celui-ci .

Interface des composants

Maintenant que la partie la plus complexe est terminée, l'étape suivante consiste à assembler ces pièces dans une interface de composant, afin de permettre au composant d'être utilisé dans un pipeline. Il y a plusieurs étapes :

  • Faire de l'interface du composant une sous-classe de base_component.BaseComponent
  • Attribuez une variable de classe SPEC_CLASS avec la classe ComponentSpec définie précédemment
  • Attribuez une variable de classe EXECUTOR_SPEC avec la classe Executor définie précédemment
  • Définissez la fonction constructeur __init__() en utilisant les arguments de la fonction pour construire une instance de la classe ComponentSpec et invoquez la super fonction avec cette valeur, ainsi qu'un nom facultatif.

Lorsqu'une instance du composant est créée, la logique de vérification de type dans la classe base_component.BaseComponent sera invoquée pour garantir que les arguments transmis sont compatibles avec les informations de type définies dans la classe ComponentSpec .

from tfx.types import standard_artifacts
from hello_component import executor

class HelloComponent(base_component.BaseComponent):
  """Custom TFX Hello World Component."""

  SPEC_CLASS = HelloComponentSpec
  EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)

  def __init__(self,
               input_data: types.Channel = None,
               output_data: types.Channel = None,
               name: Optional[Text] = None):
    if not output_data:
      examples_artifact = standard_artifacts.Examples()
      examples_artifact.split_names = input_data.get()[0].split_names
      output_data = channel_utils.as_channel([examples_artifact])

    spec = HelloComponentSpec(input_data=input_data,
                              output_data=output_data, name=name)
    super(HelloComponent, self).__init__(spec=spec)

Assembler dans un pipeline TFX

La dernière étape consiste à connecter le nouveau composant personnalisé à un pipeline TFX. Outre l'ajout d'une instance du nouveau composant, les éléments suivants sont également nécessaires :

  • Câblez-y correctement les composants amont et aval du nouveau composant. Cela se fait en référençant les sorties du composant en amont dans le nouveau composant et en référençant les sorties du nouveau composant dans les composants en aval.
  • Ajoutez la nouvelle instance de composant à la liste des composants lors de la construction du pipeline.

L'exemple ci-dessous met en évidence les changements susmentionnés. Un exemple complet peut être trouvé dans le dépôt TFX GitHub .

def _create_pipeline():
  ...
  example_gen = CsvExampleGen(input_base=examples)
  hello = component.HelloComponent(
      input_data=example_gen.outputs['examples'], name='HelloWorld')
  statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
  ...
  return pipeline.Pipeline(
      ...
      components=[example_gen, hello, statistics_gen, ...],
      ...
  )

Déployer un composant entièrement personnalisé

Outre les modifications de code, toutes les parties nouvellement ajoutées ( ComponentSpec , Executor , interface de composant) doivent être accessibles dans l'environnement d'exécution du pipeline afin d'exécuter correctement le pipeline.