Costruire una pipeline TFX a livello locale

TFX semplifica l'orchestrazione del flusso di lavoro di machine learning (ML) come una pipeline, al fine di:

  • Automatizza il tuo processo ML, che ti consente di riqualificare, valutare e distribuire regolarmente il tuo modello.
  • Crea pipeline ML che includano un'analisi approfondita delle prestazioni del modello e la convalida dei modelli appena addestrati per garantire prestazioni e affidabilità.
  • Monitora i dati di addestramento per individuare eventuali anomalie ed elimina le distorsioni nella distribuzione dell'addestramento
  • Aumenta la velocità della sperimentazione eseguendo una pipeline con diversi set di iperparametri.

Un tipico processo di sviluppo della pipeline inizia su una macchina locale, con l'analisi dei dati e la configurazione dei componenti, prima di essere distribuito in produzione. Questa guida descrive due modi per creare una pipeline localmente.

  • Personalizza un modello di pipeline TFX per soddisfare le esigenze del tuo flusso di lavoro ML. I modelli di pipeline TFX sono flussi di lavoro predefiniti che dimostrano le migliori pratiche utilizzando i componenti standard TFX.
  • Costruisci una pipeline utilizzando TFX. In questo caso d'uso, definisci una pipeline senza iniziare da un modello.

Mentre sviluppi la pipeline, puoi eseguirla con LocalDagRunner . Quindi, una volta che i componenti della pipeline sono stati ben definiti e testati, utilizzeresti un orchestratore di livello produttivo come Kubeflow o Airflow.

Prima di iniziare

TFX è un pacchetto Python, quindi dovrai configurare un ambiente di sviluppo Python, come un ambiente virtuale o un contenitore Docker. Poi:

pip install tfx

Se non conosci le pipeline TFX, scopri di più sui concetti fondamentali delle pipeline TFX prima di continuare.

Costruisci una pipeline utilizzando un modello

I modelli di pipeline TFX semplificano l'avvio dello sviluppo della pipeline fornendo un set predefinito di definizioni di pipeline che puoi personalizzare per il tuo caso d'uso.

Le sezioni seguenti descrivono come creare una copia di un modello e personalizzarla per soddisfare le proprie esigenze.

Crea una copia del modello di pipeline

  1. Consulta l'elenco dei modelli di pipeline TFX disponibili:

    tfx template list
    
  2. Seleziona un modello dall'elenco

    tfx template copy --model=template --pipeline_name=pipeline-name \
    --destination_path=destination-path
    

    Sostituisci quanto segue:

    • template : il nome del modello che desideri copiare.
    • pipeline-name : il nome della pipeline da creare.
    • destination-path : il percorso in cui copiare il modello.

    Ulteriori informazioni sul comando tfx template copy .

  3. Una copia del modello di pipeline è stata creata nel percorso specificato.

Esplora il modello di pipeline

Questa sezione fornisce una panoramica dell'impalcatura creata da un modello.

  1. Esplora le directory e i file che sono stati copiati nella directory root della pipeline

    • Una directory della pipeline con
      • pipeline.py : definisce la pipeline ed elenca quali componenti vengono utilizzati
      • configs.py : conserva i dettagli di configurazione come la provenienza dei dati o l'orchestratore utilizzato
    • Una directory di dati
      • Solitamente contiene un file data.csv , che è l'origine predefinita per ExampleGen . Puoi modificare l'origine dati in configs.py .
    • Una directory di modelli con codice di preelaborazione e implementazioni di modelli

    • Il modello copia i runner DAG per l'ambiente locale e Kubeflow.

    • Alcuni modelli includono anche quaderni Python in modo da poter esplorare i dati e gli artefatti con i metadati di Machine Learning.

  2. Esegui i seguenti comandi nella directory della pipeline:

    tfx pipeline create --pipeline_path local_runner.py
    
    tfx run create --pipeline_name pipeline_name
    

    Il comando crea un'esecuzione della pipeline utilizzando LocalDagRunner , che aggiunge le seguenti directory alla pipeline:

    • Una directory tfx_metadata che contiene l'archivio metadati ML utilizzato localmente.
    • Una directory tfx_pipeline_output che contiene gli output dei file della pipeline.
  3. Apri il file pipeline/configs.py della tua pipeline ed esamina il contenuto. Questo script definisce le opzioni di configurazione utilizzate dalla pipeline e dalle funzioni del componente. Qui è dove specificherai cose come la posizione dell'origine dati o il numero di passaggi di addestramento in un'esecuzione.

  4. Apri il file pipeline/pipeline.py della tua pipeline ed esamina i contenuti. Questo script crea la pipeline TFX. Inizialmente, la pipeline contiene solo un componente ExampleGen .

    • Segui le istruzioni nei commenti TODO in pipeline.py per aggiungere ulteriori passaggi alla pipeline.
  5. Apri il file local_runner.py e rivedi il contenuto. Questo script crea un'esecuzione della pipeline e specifica i parametri dell'esecuzione, come data_path e preprocessing_fn .

  6. Hai esaminato l'impalcatura creata dal modello e creato un'esecuzione della pipeline utilizzando LocalDagRunner . Successivamente, personalizza il modello in base alle tue esigenze.

Personalizza la tua pipeline

Questa sezione fornisce una panoramica su come iniziare a personalizzare il modello.

  1. Progetta la tua pipeline. L'impalcatura fornita da un modello consente di implementare una pipeline per dati tabulari utilizzando i componenti standard TFX. Se stai spostando un flusso di lavoro ML esistente in una pipeline, potrebbe essere necessario rivedere il codice per sfruttare appieno i componenti standard TFX . Potrebbe anche essere necessario creare componenti personalizzati che implementino funzionalità uniche per il tuo flusso di lavoro o che non siano ancora supportate dai componenti standard TFX.

  2. Dopo aver progettato la pipeline, personalizzala in modo iterativo utilizzando la procedura seguente. Inizia dal componente che inserisce i dati nella tua pipeline, che solitamente è il componente ExampleGen .

    1. Personalizza la pipeline o un componente per adattarlo al tuo caso d'uso. Queste personalizzazioni possono includere modifiche come:

      • Modifica dei parametri della pipeline.
      • Aggiunta di componenti alla pipeline o rimozione degli stessi.
      • Sostituzione della sorgente di input dei dati. Questa origine dati può essere un file o query in servizi come BigQuery.
      • Modifica della configurazione di un componente nella pipeline.
      • Modifica della funzione di personalizzazione di un componente.
    2. Esegui il componente localmente utilizzando lo script local_runner.py o un altro runner DAG appropriato se utilizzi un orchestratore diverso. Se lo script fallisce, esegui il debug dell'errore e riprova a eseguire lo script.

    3. Una volta che questa personalizzazione funziona, passa alla personalizzazione successiva.

  3. Lavorando in modo iterativo, puoi personalizzare ogni passaggio del flusso di lavoro del modello per soddisfare le tue esigenze.

Costruisci una pipeline personalizzata

Utilizza le seguenti istruzioni per ulteriori informazioni sulla creazione di una pipeline personalizzata senza utilizzare un modello.

  1. Progetta la tua pipeline. I componenti standard TFX forniscono funzionalità comprovate per aiutarti a implementare un flusso di lavoro ML completo. Se stai spostando un flusso di lavoro ML esistente in una pipeline, potrebbe essere necessario rivedere il codice per sfruttare appieno i componenti standard TFX. Potrebbe anche essere necessario creare componenti personalizzati che implementino funzionalità come l'aumento dei dati.

  2. Crea un file di script per definire la pipeline utilizzando l'esempio seguente. Questa guida fa riferimento a questo file come my_pipeline.py .

    import os
    from typing import Optional, Text, List
    from absl import logging
    from ml_metadata.proto import metadata_store_pb2
    import tfx.v1 as tfx
    
    PIPELINE_NAME = 'my_pipeline'
    PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output')
    METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db')
    ENABLE_CACHE = True
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, <!-- needed? -->
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
          pipeline_name=PIPELINE_NAME,
          pipeline_root=PIPELINE_ROOT,
          enable_cache=ENABLE_CACHE,
          metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
          )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    
    if __name__ == '__main__':
      logging.set_verbosity(logging.INFO)
      run_pipeline()

    Nei passaggi successivi, definirai la pipeline in create_pipeline ed eseguirai la pipeline localmente utilizzando il runner locale.

    Costruisci in modo iterativo la tua pipeline utilizzando il processo seguente.

    1. Personalizza la pipeline o un componente per adattarlo al tuo caso d'uso. Queste personalizzazioni possono includere modifiche come:

      • Modifica dei parametri della pipeline.
      • Aggiunta di componenti alla pipeline o rimozione degli stessi.
      • Sostituzione di un file di input dati.
      • Modifica della configurazione di un componente nella pipeline.
      • Modifica della funzione di personalizzazione di un componente.
    2. Esegui il componente localmente utilizzando il runner locale o eseguendo direttamente lo script. Se lo script fallisce, esegui il debug dell'errore e riprova a eseguire lo script.

    3. Una volta che questa personalizzazione funziona, passa alla personalizzazione successiva.

    Inizia dal primo nodo nel flusso di lavoro della pipeline, in genere il primo nodo inserisce i dati nella pipeline.

  3. Aggiungi il primo nodo del flusso di lavoro alla pipeline. In questo esempio, la pipeline utilizza il componente standard ExampleGen per caricare un CSV da una directory in ./data .

    from tfx.components import CsvExampleGen
    
    DATA_PATH = os.path.join('.', 'data')
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      data_path: Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      example_gen = tfx.components.CsvExampleGen(input_base=data_path)
      components.append(example_gen)
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, <!-- needed? -->
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_path=DATA_PATH,
        enable_cache=ENABLE_CACHE,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
        )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)

    CsvExampleGen crea record di esempio serializzati utilizzando i dati nel CSV nel percorso dati specificato. Impostando il parametro input_base del componente CsvExampleGen con la radice dei dati.

  4. Crea una directory data nella stessa directory di my_pipeline.py . Aggiungi un piccolo file CSV alla directory data .

  5. Utilizza il comando seguente per eseguire lo script my_pipeline.py .

    python my_pipeline.py
    

    Il risultato dovrebbe essere qualcosa di simile al seguente:

    INFO:absl:Component CsvExampleGen depends on [].
    INFO:absl:Component CsvExampleGen is scheduled.
    INFO:absl:Component CsvExampleGen is running.
    INFO:absl:Running driver for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Running executor for CsvExampleGen
    INFO:absl:Generating examples.
    INFO:absl:Using 1 process(es) for Local pipeline execution.
    INFO:absl:Processing input csv data ./data/* to TFExample.
    WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
    INFO:absl:Examples generated.
    INFO:absl:Running publisher for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Component CsvExampleGen is finished.
    
  6. Continua ad aggiungere in modo iterativo i componenti alla pipeline.