Construindo um pipeline TFX localmente

O TFX facilita a orquestração do fluxo de trabalho de machine learning (ML) como um pipeline, para:

  • Automatize seu processo de ML, o que permite treinar, avaliar e implantar regularmente seu modelo.
  • Crie pipelines de ML que incluem análise profunda do desempenho do modelo e validação de modelos recém-treinados para garantir desempenho e confiabilidade.
  • Monitore dados de treinamento em busca de anomalias e elimine distorções no fornecimento de treinamento
  • Aumente a velocidade da experimentação executando um pipeline com diferentes conjuntos de hiperparâmetros.

Um processo típico de desenvolvimento de pipeline começa em uma máquina local, com análise de dados e configuração de componentes, antes de ser implantado na produção. Este guia descreve duas maneiras de construir um pipeline localmente.

  • Personalize um modelo de pipeline do TFX para atender às necessidades do seu fluxo de trabalho de ML. Os modelos de pipeline do TFX são fluxos de trabalho pré-construídos que demonstram as práticas recomendadas usando os componentes padrão do TFX.
  • Crie um pipeline usando TFX. Neste caso de uso, você define um pipeline sem iniciar a partir de um modelo.

Ao desenvolver seu pipeline, você pode executá-lo com LocalDagRunner . Então, uma vez que os componentes do pipeline tenham sido bem definidos e testados, você usaria um orquestrador de nível de produção, como Kubeflow ou Airflow.

Antes de começar

TFX é um pacote Python, então você precisará configurar um ambiente de desenvolvimento Python, como um ambiente virtual ou um contêiner Docker. Então:

pip install tfx

Se você é novo nos pipelines do TFX, saiba mais sobre os principais conceitos dos pipelines do TFX antes de continuar.

Crie um pipeline usando um modelo

Os modelos de pipeline do TFX facilitam o início do desenvolvimento de pipeline, fornecendo um conjunto pré-construído de definições de pipeline que você pode personalizar para seu caso de uso.

As seções a seguir descrevem como criar uma cópia de um modelo e personalizá-lo para atender às suas necessidades.

Crie uma cópia do modelo de pipeline

  1. Veja a lista dos modelos de pipeline TFX disponíveis:

    tfx template list
    
  2. Selecione um modelo da lista

    tfx template copy --model=template --pipeline_name=pipeline-name \
    --destination_path=destination-path
    

    Substitua o seguinte:

    • template : O nome do modelo que você deseja copiar.
    • pipeline-name : o nome do pipeline a ser criado.
    • destination-path : o caminho para copiar o modelo.

    Saiba mais sobre o comando tfx template copy .

  3. Uma cópia do modelo de pipeline foi criada no caminho especificado.

Explore o modelo de pipeline

Esta seção fornece uma visão geral do andaime criado por um modelo.

  1. Explore os diretórios e arquivos que foram copiados para o diretório raiz do seu pipeline

    • Um diretório de pipeline com
      • pipeline.py – define o pipeline e lista quais componentes estão sendo usados
      • configs.py - mantém detalhes de configuração, como de onde vêm os dados ou qual orquestrador está sendo usado
    • Um diretório de dados
      • Normalmente contém um arquivo data.csv , que é a fonte padrão para ExampleGen . Você pode alterar a fonte de dados em configs.py .
    • Um diretório de modelos com código de pré-processamento e implementações de modelo

    • O modelo copia executores DAG para ambiente local e Kubeflow.

    • Alguns modelos também incluem Notebooks Python para que você possa explorar seus dados e artefatos com metadados de aprendizado de máquina.

  2. Execute os seguintes comandos no diretório do pipeline:

    tfx pipeline create --pipeline_path local_runner.py
    
    tfx run create --pipeline_name pipeline_name
    

    O comando cria uma execução de pipeline usando LocalDagRunner , que adiciona os seguintes diretórios ao pipeline:

    • Um diretório tfx_metadata que contém o armazenamento de metadados de ML usado localmente.
    • Um diretório tfx_pipeline_output que contém as saídas de arquivo do pipeline.
  3. Abra o arquivo pipeline/configs.py do seu pipeline e revise o conteúdo. Este script define as opções de configuração usadas pelo pipeline e pelas funções do componente. É aqui que você especificaria coisas como a localização da fonte de dados ou o número de etapas de treinamento em uma execução.

  4. Abra o arquivo pipeline/pipeline.py do seu pipeline e revise o conteúdo. Este script cria o pipeline do TFX. Inicialmente, o pipeline contém apenas um componente ExampleGen .

    • Siga as instruções nos comentários TODO em pipeline.py para adicionar mais etapas ao pipeline.
  5. Abra o arquivo local_runner.py e revise o conteúdo. Este script cria uma execução de pipeline e especifica os parâmetros da execução, como data_path e preprocessing_fn .

  6. Você revisou o andaime criado pelo modelo e criou uma execução de pipeline usando LocalDagRunner . Em seguida, personalize o modelo para atender às suas necessidades.

Personalize seu pipeline

Esta seção fornece uma visão geral de como começar a personalizar seu modelo.

  1. Projete seu pipeline. A estrutura fornecida por um modelo ajuda a implementar um pipeline para dados tabulares usando os componentes padrão do TFX. Se você estiver migrando um fluxo de trabalho de ML existente para um pipeline, talvez seja necessário revisar seu código para aproveitar ao máximo os componentes padrão do TFX . Você também pode precisar criar componentes personalizados que implementem recursos exclusivos do seu fluxo de trabalho ou que ainda não sejam suportados pelos componentes padrão do TFX.

  2. Depois de projetar seu pipeline, personalize-o iterativamente usando o processo a seguir. Comece pelo componente que ingere dados em seu pipeline, que geralmente é o componente ExampleGen .

    1. Personalize o pipeline ou um componente de acordo com seu caso de uso. Essas personalizações podem incluir alterações como:

      • Alterando parâmetros do pipeline.
      • Adicionar componentes ao pipeline ou removê-los.
      • Substituindo a fonte de entrada de dados. Essa fonte de dados pode ser um arquivo ou consultas em serviços como o BigQuery.
      • Alterar a configuração de um componente no pipeline.
      • Alterar a função de personalização de um componente.
    2. Execute o componente localmente usando o script local_runner.py ou outro executor DAG apropriado se você estiver usando um orquestrador diferente. Se o script falhar, depure a falha e tente executar o script novamente.

    3. Assim que essa personalização estiver funcionando, passe para a próxima personalização.

  3. Trabalhando de forma iterativa, você pode personalizar cada etapa do fluxo de trabalho do modelo para atender às suas necessidades.

Crie um pipeline personalizado

Use as instruções a seguir para saber mais sobre como criar um pipeline personalizado sem usar um modelo.

  1. Projete seu pipeline. Os componentes padrão do TFX fornecem funcionalidade comprovada para ajudá-lo a implementar um fluxo de trabalho de ML completo. Se você estiver migrando um fluxo de trabalho de ML existente para um pipeline, talvez seja necessário revisar seu código para aproveitar ao máximo os componentes padrão do TFX. Você também pode precisar criar componentes personalizados que implementem recursos como aumento de dados.

  2. Crie um arquivo de script para definir seu pipeline usando o exemplo a seguir. Este guia refere-se a este arquivo como my_pipeline.py .

    import os
    from typing import Optional, Text, List
    from absl import logging
    from ml_metadata.proto import metadata_store_pb2
    import tfx.v1 as tfx
    
    PIPELINE_NAME = 'my_pipeline'
    PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output')
    METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db')
    ENABLE_CACHE = True
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, <!-- needed? -->
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
          pipeline_name=PIPELINE_NAME,
          pipeline_root=PIPELINE_ROOT,
          enable_cache=ENABLE_CACHE,
          metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
          )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    
    if __name__ == '__main__':
      logging.set_verbosity(logging.INFO)
      run_pipeline()

    Nas próximas etapas, você definirá seu pipeline em create_pipeline e executará seu pipeline localmente usando o executor local.

    Crie iterativamente seu pipeline usando o processo a seguir.

    1. Personalize o pipeline ou um componente de acordo com seu caso de uso. Essas personalizações podem incluir alterações como:

      • Alterando parâmetros do pipeline.
      • Adicionar componentes ao pipeline ou removê-los.
      • Substituindo um arquivo de entrada de dados.
      • Alterar a configuração de um componente no pipeline.
      • Alterar a função de personalização de um componente.
    2. Execute o componente localmente usando o executor local ou executando o script diretamente. Se o script falhar, depure a falha e tente executar o script novamente.

    3. Assim que essa personalização estiver funcionando, passe para a próxima personalização.

    Comece no primeiro nó do fluxo de trabalho do seu pipeline, normalmente o primeiro nó ingere dados no seu pipeline.

  3. Adicione o primeiro nó do fluxo de trabalho ao pipeline. Neste exemplo, o pipeline usa o componente padrão ExampleGen para carregar um CSV de um diretório em ./data .

    from tfx.components import CsvExampleGen
    
    DATA_PATH = os.path.join('.', 'data')
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      data_path: Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      example_gen = tfx.components.CsvExampleGen(input_base=data_path)
      components.append(example_gen)
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, <!-- needed? -->
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_path=DATA_PATH,
        enable_cache=ENABLE_CACHE,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
        )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)

    CsvExampleGen cria registros de exemplo serializados usando os dados do CSV no caminho de dados especificado. Definindo o parâmetro input_base do componente CsvExampleGen com a raiz de dados.

  4. Crie um diretório data no mesmo diretório que my_pipeline.py . Adicione um pequeno arquivo CSV ao diretório data .

  5. Use o seguinte comando para executar seu script my_pipeline.py .

    python my_pipeline.py
    

    O resultado deve ser algo como o seguinte:

    INFO:absl:Component CsvExampleGen depends on [].
    INFO:absl:Component CsvExampleGen is scheduled.
    INFO:absl:Component CsvExampleGen is running.
    INFO:absl:Running driver for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Running executor for CsvExampleGen
    INFO:absl:Generating examples.
    INFO:absl:Using 1 process(es) for Local pipeline execution.
    INFO:absl:Processing input csv data ./data/* to TFExample.
    WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
    INFO:absl:Examples generated.
    INFO:absl:Running publisher for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Component CsvExampleGen is finished.
    
  6. Continue a adicionar componentes iterativamente ao seu pipeline.