O TFX facilita a orquestração do fluxo de trabalho de machine learning (ML) como um pipeline, para:
- Automatize seu processo de ML, o que permite treinar, avaliar e implantar regularmente seu modelo.
- Crie pipelines de ML que incluem análise profunda do desempenho do modelo e validação de modelos recém-treinados para garantir desempenho e confiabilidade.
- Monitore dados de treinamento em busca de anomalias e elimine distorções no fornecimento de treinamento
- Aumente a velocidade da experimentação executando um pipeline com diferentes conjuntos de hiperparâmetros.
Um processo típico de desenvolvimento de pipeline começa em uma máquina local, com análise de dados e configuração de componentes, antes de ser implantado na produção. Este guia descreve duas maneiras de construir um pipeline localmente.
- Personalize um modelo de pipeline do TFX para atender às necessidades do seu fluxo de trabalho de ML. Os modelos de pipeline do TFX são fluxos de trabalho pré-construídos que demonstram as práticas recomendadas usando os componentes padrão do TFX.
- Crie um pipeline usando TFX. Neste caso de uso, você define um pipeline sem iniciar a partir de um modelo.
Ao desenvolver seu pipeline, você pode executá-lo com LocalDagRunner
. Então, uma vez que os componentes do pipeline tenham sido bem definidos e testados, você usaria um orquestrador de nível de produção, como Kubeflow ou Airflow.
Antes de começar
TFX é um pacote Python, então você precisará configurar um ambiente de desenvolvimento Python, como um ambiente virtual ou um contêiner Docker. Então:
pip install tfx
Se você é novo nos pipelines do TFX, saiba mais sobre os principais conceitos dos pipelines do TFX antes de continuar.
Crie um pipeline usando um modelo
Os modelos de pipeline do TFX facilitam o início do desenvolvimento de pipeline, fornecendo um conjunto pré-construído de definições de pipeline que você pode personalizar para seu caso de uso.
As seções a seguir descrevem como criar uma cópia de um modelo e personalizá-lo para atender às suas necessidades.
Crie uma cópia do modelo de pipeline
Veja a lista dos modelos de pipeline TFX disponíveis:
tfx template list
Selecione um modelo da lista
tfx template copy --model=template --pipeline_name=pipeline-name \ --destination_path=destination-path
Substitua o seguinte:
- template : O nome do modelo que você deseja copiar.
- pipeline-name : o nome do pipeline a ser criado.
- destination-path : o caminho para copiar o modelo.
Saiba mais sobre o comando
tfx template copy
.Uma cópia do modelo de pipeline foi criada no caminho especificado.
Explore o modelo de pipeline
Esta seção fornece uma visão geral do andaime criado por um modelo.
Explore os diretórios e arquivos que foram copiados para o diretório raiz do seu pipeline
- Um diretório de pipeline com
-
pipeline.py
– define o pipeline e lista quais componentes estão sendo usados -
configs.py
- mantém detalhes de configuração, como de onde vêm os dados ou qual orquestrador está sendo usado
-
- Um diretório de dados
- Normalmente contém um arquivo
data.csv
, que é a fonte padrão paraExampleGen
. Você pode alterar a fonte de dados emconfigs.py
.
- Normalmente contém um arquivo
Um diretório de modelos com código de pré-processamento e implementações de modelo
O modelo copia executores DAG para ambiente local e Kubeflow.
Alguns modelos também incluem Notebooks Python para que você possa explorar seus dados e artefatos com metadados de aprendizado de máquina.
- Um diretório de pipeline com
Execute os seguintes comandos no diretório do pipeline:
tfx pipeline create --pipeline_path local_runner.py
tfx run create --pipeline_name pipeline_name
O comando cria uma execução de pipeline usando
LocalDagRunner
, que adiciona os seguintes diretórios ao pipeline:- Um diretório tfx_metadata que contém o armazenamento de metadados de ML usado localmente.
- Um diretório tfx_pipeline_output que contém as saídas de arquivo do pipeline.
Abra o arquivo
pipeline/configs.py
do seu pipeline e revise o conteúdo. Este script define as opções de configuração usadas pelo pipeline e pelas funções do componente. É aqui que você especificaria coisas como a localização da fonte de dados ou o número de etapas de treinamento em uma execução.Abra o arquivo
pipeline/pipeline.py
do seu pipeline e revise o conteúdo. Este script cria o pipeline do TFX. Inicialmente, o pipeline contém apenas um componenteExampleGen
.- Siga as instruções nos comentários TODO em
pipeline.py
para adicionar mais etapas ao pipeline.
- Siga as instruções nos comentários TODO em
Abra o arquivo
local_runner.py
e revise o conteúdo. Este script cria uma execução de pipeline e especifica os parâmetros da execução, comodata_path
epreprocessing_fn
.Você revisou o andaime criado pelo modelo e criou uma execução de pipeline usando
LocalDagRunner
. Em seguida, personalize o modelo para atender às suas necessidades.
Personalize seu pipeline
Esta seção fornece uma visão geral de como começar a personalizar seu modelo.
Projete seu pipeline. A estrutura fornecida por um modelo ajuda a implementar um pipeline para dados tabulares usando os componentes padrão do TFX. Se você estiver migrando um fluxo de trabalho de ML existente para um pipeline, talvez seja necessário revisar seu código para aproveitar ao máximo os componentes padrão do TFX . Você também pode precisar criar componentes personalizados que implementem recursos exclusivos do seu fluxo de trabalho ou que ainda não sejam suportados pelos componentes padrão do TFX.
Depois de projetar seu pipeline, personalize-o iterativamente usando o processo a seguir. Comece pelo componente que ingere dados em seu pipeline, que geralmente é o componente
ExampleGen
.Personalize o pipeline ou um componente de acordo com seu caso de uso. Essas personalizações podem incluir alterações como:
- Alterando parâmetros do pipeline.
- Adicionar componentes ao pipeline ou removê-los.
- Substituindo a fonte de entrada de dados. Essa fonte de dados pode ser um arquivo ou consultas em serviços como o BigQuery.
- Alterar a configuração de um componente no pipeline.
- Alterar a função de personalização de um componente.
Execute o componente localmente usando o script
local_runner.py
ou outro executor DAG apropriado se você estiver usando um orquestrador diferente. Se o script falhar, depure a falha e tente executar o script novamente.Assim que essa personalização estiver funcionando, passe para a próxima personalização.
Trabalhando de forma iterativa, você pode personalizar cada etapa do fluxo de trabalho do modelo para atender às suas necessidades.
Crie um pipeline personalizado
Use as instruções a seguir para saber mais sobre como criar um pipeline personalizado sem usar um modelo.
Projete seu pipeline. Os componentes padrão do TFX fornecem funcionalidade comprovada para ajudá-lo a implementar um fluxo de trabalho de ML completo. Se você estiver migrando um fluxo de trabalho de ML existente para um pipeline, talvez seja necessário revisar seu código para aproveitar ao máximo os componentes padrão do TFX. Você também pode precisar criar componentes personalizados que implementem recursos como aumento de dados.
- Saiba mais sobre os componentes padrão do TFX .
- Saiba mais sobre componentes personalizados .
Crie um arquivo de script para definir seu pipeline usando o exemplo a seguir. Este guia refere-se a este arquivo como
my_pipeline.py
.import os from typing import Optional, Text, List from absl import logging from ml_metadata.proto import metadata_store_pb2 import tfx.v1 as tfx PIPELINE_NAME = 'my_pipeline' PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output') METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db') ENABLE_CACHE = True def create_pipeline( pipeline_name: Text, pipeline_root:Text, enable_cache: bool, metadata_connection_config: Optional[ metadata_store_pb2.ConnectionConfig] = None, beam_pipeline_args: Optional[List[Text]] = None ): components = [] return tfx.dsl.Pipeline( pipeline_name=pipeline_name, pipeline_root=pipeline_root, components=components, enable_cache=enable_cache, metadata_connection_config=metadata_connection_config, beam_pipeline_args=beam_pipeline_args, <!-- needed? --> ) def run_pipeline(): my_pipeline = create_pipeline( pipeline_name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT, enable_cache=ENABLE_CACHE, metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH) ) tfx.orchestration.LocalDagRunner().run(my_pipeline) if __name__ == '__main__': logging.set_verbosity(logging.INFO) run_pipeline()
Nas próximas etapas, você definirá seu pipeline em
create_pipeline
e executará seu pipeline localmente usando o executor local.Crie iterativamente seu pipeline usando o processo a seguir.
Personalize o pipeline ou um componente de acordo com seu caso de uso. Essas personalizações podem incluir alterações como:
- Alterando parâmetros do pipeline.
- Adicionar componentes ao pipeline ou removê-los.
- Substituindo um arquivo de entrada de dados.
- Alterar a configuração de um componente no pipeline.
- Alterar a função de personalização de um componente.
Execute o componente localmente usando o executor local ou executando o script diretamente. Se o script falhar, depure a falha e tente executar o script novamente.
Assim que essa personalização estiver funcionando, passe para a próxima personalização.
Comece no primeiro nó do fluxo de trabalho do seu pipeline, normalmente o primeiro nó ingere dados no seu pipeline.
Adicione o primeiro nó do fluxo de trabalho ao pipeline. Neste exemplo, o pipeline usa o componente padrão
ExampleGen
para carregar um CSV de um diretório em./data
.from tfx.components import CsvExampleGen DATA_PATH = os.path.join('.', 'data') def create_pipeline( pipeline_name: Text, pipeline_root:Text, data_path: Text, enable_cache: bool, metadata_connection_config: Optional[ metadata_store_pb2.ConnectionConfig] = None, beam_pipeline_args: Optional[List[Text]] = None ): components = [] example_gen = tfx.components.CsvExampleGen(input_base=data_path) components.append(example_gen) return tfx.dsl.Pipeline( pipeline_name=pipeline_name, pipeline_root=pipeline_root, components=components, enable_cache=enable_cache, metadata_connection_config=metadata_connection_config, beam_pipeline_args=beam_pipeline_args, <!-- needed? --> ) def run_pipeline(): my_pipeline = create_pipeline( pipeline_name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT, data_path=DATA_PATH, enable_cache=ENABLE_CACHE, metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH) ) tfx.orchestration.LocalDagRunner().run(my_pipeline)
CsvExampleGen
cria registros de exemplo serializados usando os dados do CSV no caminho de dados especificado. Definindo o parâmetroinput_base
do componenteCsvExampleGen
com a raiz de dados.Crie um diretório
data
no mesmo diretório quemy_pipeline.py
. Adicione um pequeno arquivo CSV ao diretóriodata
.Use o seguinte comando para executar seu script
my_pipeline.py
.python my_pipeline.py
O resultado deve ser algo como o seguinte:
INFO:absl:Component CsvExampleGen depends on []. INFO:absl:Component CsvExampleGen is scheduled. INFO:absl:Component CsvExampleGen is running. INFO:absl:Running driver for CsvExampleGen INFO:absl:MetadataStore with DB connection initialized INFO:absl:Running executor for CsvExampleGen INFO:absl:Generating examples. INFO:absl:Using 1 process(es) for Local pipeline execution. INFO:absl:Processing input csv data ./data/* to TFExample. WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Running publisher for CsvExampleGen INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished.
Continue a adicionar componentes iterativamente ao seu pipeline.