Construyendo una canalización TFX localmente

TFX facilita la organización de su flujo de trabajo de aprendizaje automático (ML) como canalización para:

  • Automatice su proceso de aprendizaje automático, lo que le permite volver a entrenar, evaluar e implementar su modelo con regularidad.
  • Cree canales de aprendizaje automático que incluyan un análisis profundo del rendimiento del modelo y la validación de modelos recién entrenados para garantizar el rendimiento y la confiabilidad.
  • Monitoree los datos de capacitación para detectar anomalías y elimine el sesgo en el servicio de capacitación
  • Aumente la velocidad de la experimentación ejecutando una canalización con diferentes conjuntos de hiperparámetros.

Un proceso típico de desarrollo de tuberías comienza en una máquina local, con el análisis de datos y la configuración de los componentes, antes de implementarse en producción. Esta guía describe dos formas de construir una tubería localmente.

  • Personalice una plantilla de canalización TFX para que se ajuste a las necesidades de su flujo de trabajo de ML. Las plantillas de canalización de TFX son flujos de trabajo prediseñados que demuestran las mejores prácticas utilizando los componentes estándar de TFX.
  • Construya una tubería usando TFX. En este caso de uso, define una canalización sin comenzar a partir de una plantilla.

Mientras desarrolla su canalización, puede ejecutarla con LocalDagRunner . Luego, una vez que los componentes de la canalización se hayan definido y probado bien, se utilizará un orquestador de nivel de producción como Kubeflow o Airflow.

Antes de comenzar

TFX es un paquete de Python, por lo que deberá configurar un entorno de desarrollo de Python, como un entorno virtual o un contenedor Docker. Entonces:

pip install tfx

Si es nuevo en las canalizaciones TFX, obtenga más información sobre los conceptos básicos de las canalizaciones TFX antes de continuar.

Construya una tubería usando una plantilla

Las plantillas de canalización TFX facilitan el inicio del desarrollo de canalizaciones al proporcionar un conjunto prediseñado de definiciones de canalizaciones que puede personalizar según su caso de uso.

Las siguientes secciones describen cómo crear una copia de una plantilla y personalizarla para satisfacer sus necesidades.

Crear una copia de la plantilla de canalización

  1. Consulte la lista de plantillas de canalización TFX disponibles:

    tfx template list
    
  2. Seleccione una plantilla de la lista

    tfx template copy --model=template --pipeline_name=pipeline-name \
    --destination_path=destination-path
    

    Reemplace lo siguiente:

    • template : el nombre de la plantilla que desea copiar.
    • pipeline-name : el nombre de la canalización que se creará.
    • destination-path : la ruta para copiar la plantilla.

    Obtenga más información sobre el comando tfx template copy .

  3. Se ha creado una copia de la plantilla de canalización en la ruta que especificó.

Explora la plantilla de canalización

Esta sección proporciona una descripción general del andamiaje creado por una plantilla.

  1. Explore los directorios y archivos que se copiaron en el directorio raíz de su canalización

    • Un directorio de tuberías con
      • pipeline.py : define la canalización y enumera qué componentes se están utilizando
      • configs.py : contiene detalles de configuración, como de dónde provienen los datos o qué orquestador se está utilizando.
    • un directorio de datos
      • Normalmente contiene un archivo data.csv , que es la fuente predeterminada para ExampleGen . Puede cambiar la fuente de datos en configs.py .
    • Un directorio de modelos con código de preprocesamiento e implementaciones de modelos.

    • La plantilla copia los corredores DAG para el entorno local y Kubeflow.

    • Algunas plantillas también incluyen cuadernos de Python para que pueda explorar sus datos y artefactos con metadatos de aprendizaje automático.

  2. Ejecute los siguientes comandos en su directorio de canalización:

    tfx pipeline create --pipeline_path local_runner.py
    
    tfx run create --pipeline_name pipeline_name
    

    El comando crea una ejecución de canalización utilizando LocalDagRunner , que agrega los siguientes directorios a su canalización:

    • Un directorio tfx_metadata que contiene el almacén de metadatos de ML utilizado localmente.
    • Un directorio tfx_pipeline_output que contiene las salidas de archivos de la canalización.
  3. Abra el archivo pipeline/configs.py de su canalización y revise el contenido. Este script define las opciones de configuración utilizadas por la canalización y las funciones de los componentes. Aquí es donde especificaría cosas como la ubicación de la fuente de datos o la cantidad de pasos de entrenamiento en una ejecución.

  4. Abra el archivo pipeline/pipeline.py de su canalización y revise el contenido. Este script crea la canalización TFX. Inicialmente, la canalización contiene solo un componente ExampleGen .

    • Siga las instrucciones en los comentarios TODO en pipeline.py para agregar más pasos al pipeline.
  5. Abra el archivo local_runner.py y revise el contenido. Este script crea una ejecución de canalización y especifica los parámetros de la ejecución, como data_path y preprocessing_fn .

  6. Ha revisado el andamiaje creado por la plantilla y ha creado una ejecución de canalización utilizando LocalDagRunner . A continuación, personalice la plantilla para que se ajuste a sus necesidades.

Personaliza tu canalización

Esta sección proporciona una descripción general de cómo comenzar a personalizar su plantilla.

  1. Diseñe su tubería. El andamiaje que proporciona una plantilla le ayuda a implementar una canalización para datos tabulares utilizando los componentes estándar TFX. Si está trasladando un flujo de trabajo de aprendizaje automático existente a una canalización, es posible que deba revisar su código para aprovechar al máximo los componentes estándar de TFX . Es posible que también necesite crear componentes personalizados que implementen funciones que sean exclusivas de su flujo de trabajo o que aún no sean compatibles con los componentes estándar de TFX.

  2. Una vez que haya diseñado su canalización, personalícela de forma iterativa mediante el siguiente proceso. Comience desde el componente que ingiere datos en su canalización, que suele ser el componente ExampleGen .

    1. Personalice la canalización o un componente para que se ajuste a su caso de uso. Estas personalizaciones pueden incluir cambios como:

      • Cambio de parámetros de tubería.
      • Agregar componentes a la tubería o eliminarlos.
      • Reemplazo de la fuente de entrada de datos. Esta fuente de datos puede ser un archivo o consultas a servicios como BigQuery.
      • Cambiar la configuración de un componente en la tubería.
      • Cambiar la función de personalización de un componente.
    2. Ejecute el componente localmente usando el script local_runner.py u otro ejecutor DAG apropiado si está usando un orquestador diferente. Si el script falla, depure el error y vuelva a intentar ejecutar el script.

    3. Una vez que esta personalización esté funcionando, pase a la siguiente personalización.

  3. Al trabajar de forma iterativa, puede personalizar cada paso del flujo de trabajo de la plantilla para satisfacer sus necesidades.

Construya una canalización personalizada

Utilice las siguientes instrucciones para obtener más información sobre cómo crear una canalización personalizada sin utilizar una plantilla.

  1. Diseñe su tubería. Los componentes estándar de TFX brindan una funcionalidad comprobada para ayudarlo a implementar un flujo de trabajo de aprendizaje automático completo. Si está trasladando un flujo de trabajo de ML existente a una canalización, es posible que deba revisar su código para aprovechar al máximo los componentes estándar de TFX. Es posible que también necesite crear componentes personalizados que implementen funciones como el aumento de datos.

  2. Cree un archivo de script para definir su canalización usando el siguiente ejemplo. Esta guía hace referencia a este archivo como my_pipeline.py .

    import os
    from typing import Optional, Text, List
    from absl import logging
    from ml_metadata.proto import metadata_store_pb2
    import tfx.v1 as tfx
    
    PIPELINE_NAME = 'my_pipeline'
    PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output')
    METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db')
    ENABLE_CACHE = True
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
          pipeline_name=PIPELINE_NAME,
          pipeline_root=PIPELINE_ROOT,
          enable_cache=ENABLE_CACHE,
          metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
          )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    
    if __name__ == '__main__':
      logging.set_verbosity(logging.INFO)
      run_pipeline()
    

    En los siguientes pasos, definirá su canalización en create_pipeline y ejecutará su canalización localmente utilizando el ejecutor local.

    Construya iterativamente su canalización mediante el siguiente proceso.

    1. Personalice la canalización o un componente para que se ajuste a su caso de uso. Estas personalizaciones pueden incluir cambios como:

      • Cambio de parámetros de tubería.
      • Agregar componentes a la tubería o eliminarlos.
      • Reemplazo de un archivo de entrada de datos.
      • Cambiar la configuración de un componente en la tubería.
      • Cambiar la función de personalización de un componente.
    2. Ejecute el componente localmente utilizando el ejecutor local o ejecutando el script directamente. Si el script falla, depure el error y vuelva a intentar ejecutar el script.

    3. Una vez que esta personalización esté funcionando, pase a la siguiente personalización.

    Comience desde el primer nodo del flujo de trabajo de su canalización; normalmente, el primer nodo ingiere datos en su canalización.

  3. Agregue el primer nodo de su flujo de trabajo a su canalización. En este ejemplo, la canalización utiliza el componente estándar ExampleGen para cargar un CSV desde un directorio en ./data .

    from tfx.components import CsvExampleGen
    
    DATA_PATH = os.path.join('.', 'data')
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      data_path: Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      example_gen = tfx.components.CsvExampleGen(input_base=data_path)
      components.append(example_gen)
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_path=DATA_PATH,
        enable_cache=ENABLE_CACHE,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
        )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    

    CsvExampleGen crea registros de ejemplo serializados utilizando los datos del CSV en la ruta de datos especificada. Configurando el parámetro input_base del componente CsvExampleGen con la raíz de datos.

  4. Cree un directorio data en el mismo directorio que my_pipeline.py . Agregue un pequeño archivo CSV al directorio data .

  5. Utilice el siguiente comando para ejecutar su script my_pipeline.py .

    python my_pipeline.py
    

    El resultado debería ser algo como lo siguiente:

    INFO:absl:Component CsvExampleGen depends on [].
    INFO:absl:Component CsvExampleGen is scheduled.
    INFO:absl:Component CsvExampleGen is running.
    INFO:absl:Running driver for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Running executor for CsvExampleGen
    INFO:absl:Generating examples.
    INFO:absl:Using 1 process(es) for Local pipeline execution.
    INFO:absl:Processing input csv data ./data/* to TFExample.
    WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
    INFO:absl:Examples generated.
    INFO:absl:Running publisher for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Component CsvExampleGen is finished.
    
  6. Continúe agregando componentes de forma iterativa a su canalización.