O TFX torna mais fácil orquestrar seu fluxo de trabalho de aprendizado de máquina (ML) como um pipeline, para:
- Automatize seu processo de ML, o que permite treinar, avaliar e implantar regularmente seu modelo.
- Crie pipelines de ML que incluem uma análise profunda do desempenho do modelo e validação de modelos recém-treinados para garantir o desempenho e a confiabilidade.
- Monitore os dados de treinamento em busca de anomalias e elimine a distorção do serviço de treinamento
- Aumente a velocidade de experimentação executando um pipeline com diferentes conjuntos de hiperparâmetros.
Um processo típico de desenvolvimento de pipeline começa em uma máquina local, com análise de dados e configuração de componentes, antes de ser implantado na produção. Este guia descreve duas maneiras de construir um pipeline localmente.
- Personalize um modelo de pipeline TFX para atender às necessidades de seu fluxo de trabalho de ML. Os modelos de pipeline TFX são fluxos de trabalho pré-construídos que demonstram as melhores práticas usando os componentes padrão TFX.
- Construa um pipeline usando TFX. Nesse caso de uso, você define um pipeline sem iniciar a partir de um modelo.
Como você está desenvolvendo seu pipeline, você pode executá-lo com LocalDagRunner
. Então, depois que os componentes do pipeline foram bem definidos e testados, você usaria um orquestrador de nível de produção, como Kubeflow ou Airflow.
Antes de você começar
TFX é um pacote Python, então você precisará configurar um ambiente de desenvolvimento Python, como um ambiente virtual ou um contêiner Docker. Então:
pip install tfx
Se você é novo para pipelines TFX, aprender mais sobre os conceitos fundamentais para condutas TFX antes de continuar.
Construir um pipeline usando um modelo
Os modelos de pipeline TFX facilitam o início do desenvolvimento de pipeline, fornecendo um conjunto pré-construído de definições de pipeline que você pode personalizar para seu caso de uso.
As seções a seguir descrevem como criar uma cópia de um modelo e personalizá-lo para atender às suas necessidades.
Crie uma cópia do modelo de pipeline
Veja a lista dos modelos de pipeline TFX disponíveis:
tfx template list
Selecione um modelo da lista
tfx template copy --model=template --pipeline_name=pipeline-name \ --destination_path=destination-path
Substitua o seguinte:
- template : O nome do modelo que você deseja copiar.
- pipeline-name : O nome do gasoduto para criar.
- destination-path : O caminho para copiar o modelo para.
Saiba mais sobre a
tfx template copy
comando .Uma cópia do modelo de pipeline foi criada no caminho que você especificou.
Explore o modelo de pipeline
Esta seção fornece uma visão geral da estrutura criada por um modelo.
Explore os diretórios e arquivos que foram copiados para o diretório raiz do pipeline
- Um diretório de pipeline com
-
pipeline.py
- define o gasoduto, e listas de quais componentes estão sendo usados -
configs.py
detalhes de configuração de retenção, tais como onde os dados são provenientes de ou que orquestrador está sendo usado -
-
- Um diretório de dados
- Isso normalmente contém uma
data.csv
arquivo, que é a fonte padrão paraExampleGen
. Você pode alterar a fonte de dados emconfigs.py
.
- Isso normalmente contém uma
Um diretório de modelos com código e modelo de pré-processamento implementações
O modelo copia os executores DAG para o ambiente local e Kubeflow.
Alguns modelos também incluem Notebooks Python para que você possa explorar seus dados e artefatos com MetaDados de aprendizado de máquina.
- Um diretório de pipeline com
Execute os seguintes comandos no diretório do pipeline:
tfx pipeline create --pipeline_path local_runner.py
tfx run create --pipeline_name pipeline_name
O comando cria uma corrida gasoduto usando
LocalDagRunner
, que adiciona os seguintes diretórios para seu pipeline:- Um diretório tfx_metadata que contém o armazenamento ML Metadados usado localmente.
- Um diretório tfx_pipeline_output que contém saídas de arquivo do gasoduto.
Abra o seu pipeline de
pipeline/configs.py
arquivo e revisar o conteúdo. Este script define as opções de configuração usadas pelo pipeline e as funções do componente. É aqui que você especifica coisas como a localização da fonte de dados ou o número de etapas de treinamento em uma execução.Abra o seu pipeline de
pipeline/pipeline.py
arquivo e revisar o conteúdo. Este script cria o pipeline TFX. Inicialmente, o gasoduto contém apenas umExampleGen
componente.- Siga as instruções nos comentários TODO em
pipeline.py
para adicionar mais passos para o pipeline.
- Siga as instruções nos comentários TODO em
Abrir
local_runner.py
arquivo e revisão do conteúdo. Este script cria uma corrida gasoduto e especifica os parâmetros do funcionamento, tais como adata_path
epreprocessing_fn
.Você analisou o andaime criado pelo modelo e criou uma corrida gasoduto usando
LocalDagRunner
. Em seguida, personalize o modelo para atender aos seus requisitos.
Personalize seu pipeline
Esta seção fornece uma visão geral de como começar a personalizar seu modelo.
Projete seu pipeline. A estrutura que um modelo fornece ajuda a implementar um pipeline para dados tabulares usando os componentes padrão TFX. Se você estiver movendo um fluxo de trabalho ML existente em um gasoduto, você pode precisar de rever o seu código para fazer pleno uso de componentes padrão TFX . Você também pode precisar para criar componentes personalizados que implementam características que são únicas para o seu fluxo de trabalho ou que ainda não são suportados por componentes padrão TFX.
Depois de projetar seu pipeline, personalize iterativamente o pipeline usando o seguinte processo. Iniciar a partir do componente que ingere dados em seu pipeline, que é geralmente o
ExampleGen
componente.Personalize o pipeline ou um componente para se adequar ao seu caso de uso. Essas personalizações podem incluir alterações como:
- Alterar os parâmetros do pipeline.
- Adicionar componentes ao pipeline ou removê-los.
- Substituindo a fonte de entrada de dados. Essa fonte de dados pode ser um arquivo ou consultas em serviços como o BigQuery.
- Alterar a configuração de um componente no pipeline.
- Alterar a função de personalização de um componente.
Executar o componente localmente usando o
local_runner.py
script ou outro corredor DAG adequado se você estiver usando um orquestrador diferente. Se o script falhar, depure a falha e tente executar o script novamente.Quando essa personalização estiver funcionando, vá para a próxima personalização.
Trabalhando iterativamente, você pode personalizar cada etapa do fluxo de trabalho do modelo para atender às suas necessidades.
Construir um pipeline personalizado
Use as instruções a seguir para saber mais sobre como construir um pipeline personalizado sem usar um modelo.
Projete seu pipeline. Os componentes padrão TFX fornecem funcionalidade comprovada para ajudá-lo a implementar um fluxo de trabalho de ML completo. Se você estiver movendo um fluxo de trabalho de ML existente para um pipeline, pode ser necessário revisar seu código para fazer uso total dos componentes padrão do TFX. Você também pode precisar para criar componentes personalizados que implementam funcionalidades, tais como o aumento de dados.
- Saiba mais sobre os componentes TFX padrão .
- Saiba mais sobre componentes personalizados .
Crie um arquivo de script para definir seu pipeline usando o exemplo a seguir. Este guia refere-se a este ficheiro como
my_pipeline.py
.import os from typing import Optional, Text, List from absl import logging from ml_metadata.proto import metadata_store_pb2 import tfx.v1 as tfx PIPELINE_NAME = 'my_pipeline' PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output') METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db') ENABLE_CACHE = True def create_pipeline( pipeline_name: Text, pipeline_root:Text, enable_cache: bool, metadata_connection_config: Optional[ metadata_store_pb2.ConnectionConfig] = None, beam_pipeline_args: Optional[List[Text]] = None ): components = [] return tfx.dsl.Pipeline( pipeline_name=pipeline_name, pipeline_root=pipeline_root, components=components, enable_cache=enable_cache, metadata_connection_config=metadata_connection_config, beam_pipeline_args=beam_pipeline_args, ) def run_pipeline(): my_pipeline = create_pipeline( pipeline_name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT, enable_cache=ENABLE_CACHE, metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH) ) tfx.orchestration.LocalDagRunner().run(my_pipeline) if __name__ == '__main__': logging.set_verbosity(logging.INFO) run_pipeline()
Nas próximas etapas, você define seu pipeline em
create_pipeline
e executar o seu pipeline localmente usando o corredor local.Construa iterativamente seu pipeline usando o seguinte processo.
Personalize o pipeline ou um componente para se adequar ao seu caso de uso. Essas personalizações podem incluir alterações como:
- Alterar os parâmetros do pipeline.
- Adicionar componentes ao pipeline ou removê-los.
- Substituindo um arquivo de entrada de dados.
- Alterar a configuração de um componente no pipeline.
- Alterar a função de personalização de um componente.
Execute o componente localmente usando o executor local ou executando o script diretamente. Se o script falhar, depure a falha e tente executar o script novamente.
Quando essa personalização estiver funcionando, vá para a próxima personalização.
Comece do primeiro nó no fluxo de trabalho do pipeline; normalmente, o primeiro nó ingere dados no pipeline.
Adicione o primeiro nó do fluxo de trabalho ao pipeline. Neste exemplo, o gasoduto usa o
ExampleGen
componente padrão para carregar um arquivo CSV de um diretório em./data
.from tfx.components import CsvExampleGen DATA_PATH = os.path.join('.', 'data') def create_pipeline( pipeline_name: Text, pipeline_root:Text, data_path: Text, enable_cache: bool, metadata_connection_config: Optional[ metadata_store_pb2.ConnectionConfig] = None, beam_pipeline_args: Optional[List[Text]] = None ): components = [] example_gen = tfx.components.CsvExampleGen(input_base=data_path) components.append(example_gen) return tfx.dsl.Pipeline( pipeline_name=pipeline_name, pipeline_root=pipeline_root, components=components, enable_cache=enable_cache, metadata_connection_config=metadata_connection_config, beam_pipeline_args=beam_pipeline_args, ) def run_pipeline(): my_pipeline = create_pipeline( pipeline_name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT, data_path=DATA_PATH, enable_cache=ENABLE_CACHE, metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH) ) tfx.orchestration.LocalDagRunner().run(my_pipeline)
CsvExampleGen
cria serializados exemplo registos utilizando os dados na CSV no caminho de dados especificado. Ao definir oCsvExampleGen
do componenteinput_base
parâmetro com a raiz de dados.Criar uma
data
diretório no mesmo diretório quemy_pipeline.py
. Adicionar um pequeno arquivo CSV com odata
do diretório.Use o seguinte comando para executar o seu
my_pipeline.py
script.python my_pipeline.py
O resultado deve ser algo como o seguinte:
INFO:absl:Component CsvExampleGen depends on []. INFO:absl:Component CsvExampleGen is scheduled. INFO:absl:Component CsvExampleGen is running. INFO:absl:Running driver for CsvExampleGen INFO:absl:MetadataStore with DB connection initialized INFO:absl:Running executor for CsvExampleGen INFO:absl:Generating examples. INFO:absl:Using 1 process(es) for Local pipeline execution. INFO:absl:Processing input csv data ./data/* to TFExample. WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Running publisher for CsvExampleGen INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished.
Continue a adicionar componentes iterativamente ao pipeline.