Apache Beam e TFX

Apache Beam fornisce un framework per l'esecuzione di processi di elaborazione dati in batch e in streaming eseguiti su una varietà di motori di esecuzione. Molte delle librerie TFX utilizzano Beam per l'esecuzione delle attività, il che consente un elevato grado di scalabilità tra i cluster di calcolo. Beam include il supporto per una varietà di motori di esecuzione o "corridori", incluso un corridore diretto che viene eseguito su un singolo nodo di calcolo ed è molto utile per lo sviluppo, il test o le piccole distribuzioni. Beam fornisce un livello di astrazione che consente a TFX di essere eseguito su qualsiasi runner supportato senza modifiche al codice. TFX utilizza l'API Beam Python, quindi è limitato ai corridori supportati dall'API Python.

Distribuzione e scalabilità

Con l'aumento dei requisiti del carico di lavoro, Beam può scalare fino a implementazioni molto grandi su cluster di elaborazione di grandi dimensioni. Ciò è limitato solo dalla scalabilità del corridore sottostante. I corridori in distribuzioni di grandi dimensioni verranno in genere distribuiti su un sistema di orchestrazione dei contenitori come Kubernetes o Apache Mesos per automatizzare la distribuzione, la scalabilità e la gestione delle applicazioni.

Consulta la documentazione di Apache Beam per ulteriori informazioni su Apache Beam.

Per gli utenti di Google Cloud, Dataflow è la soluzione consigliata, che fornisce una piattaforma serverless ed economicamente vantaggiosa attraverso la scalabilità automatica delle risorse, il ribilanciamento dinamico del lavoro, l'integrazione profonda con altri servizi Google Cloud, la sicurezza integrata e il monitoraggio.

Codice Python personalizzato e dipendenze

Una notevole complessità nell'utilizzo di Beam in una pipeline TFX è la gestione del codice personalizzato e/o delle dipendenze necessarie da moduli Python aggiuntivi. Ecco alcuni esempi di quando questo potrebbe rappresentare un problema:

  • preprocessing_fn deve fare riferimento al modulo Python dell'utente
  • un estrattore personalizzato per il componente Evaluator
  • moduli personalizzati che sono sottoclassati da un componente TFX

TFX si affida al supporto di Beam per la gestione delle dipendenze della pipeline Python per gestire le dipendenze Python. Attualmente ci sono due modi per gestirlo:

  1. Fornire codice Python e dipendenze come pacchetto sorgente
  2. [Solo Dataflow] Utilizzo di un'immagine contenitore come lavoratore

Questi verranno discussi di seguito.

Fornire codice Python e dipendenze come pacchetto sorgente

Questo è consigliato per gli utenti che:

  1. Hanno familiarità con il packaging Python e
  2. Utilizzare solo codice sorgente Python (ovvero, nessun modulo C o librerie condivise).

Segui uno dei percorsi in Gestione delle dipendenze della pipeline Python per fornirlo utilizzando uno dei seguenti beam_pipeline_args:

  • --setup_file
  • --pacchetto_extra
  • --requirements_file

Avviso: in uno qualsiasi dei casi precedenti, assicurati che la stessa versione di tfx sia elencata come dipendenza.

[Solo Dataflow] Utilizzo di un'immagine contenitore per un lavoratore

TFX 0.26.0 e versioni successive dispongono del supporto sperimentale per l'utilizzo dell'immagine del contenitore personalizzata per i lavoratori Dataflow.

Per utilizzare questo, devi:

  • Crea un'immagine Docker che abbia sia tfx che il codice personalizzato e le dipendenze degli utenti preinstallati.
    • Per gli utenti che (1) utilizzano tfx>=0.26 e (2) utilizzano Python 3.7 per sviluppare le proprie pipeline, il modo più semplice per farlo è estendere la versione corrispondente dell'immagine tensorflow/tfx ufficiale:
# You can use a build-arg to dynamically pass in the
# version of TFX being used to your Dockerfile.

ARG TFX_VERSION
FROM tensorflow/tfx:${TFX_VERSION}
# COPY your code and dependencies in
  • Invia l'immagine creata a un registro di immagini del contenitore accessibile dal progetto utilizzato da Dataflow.
    • Gli utenti di Google Cloud possono prendere in considerazione l'utilizzo di Cloud Build che automatizza perfettamente i passaggi precedenti.
  • Fornisci il seguente beam_pipeline_args :
beam_pipeline_args.extend([
    '--runner=DataflowRunner',
    '--project={project-id}',
    '--worker_harness_container_image={image-ref}',
    '--experiments=use_runner_v2',
])

TODO(b/171733562): rimuovere use_runner_v2 una volta che è predefinito per Dataflow.

TODO(b/179738639): crea la documentazione su come testare il contenitore personalizzato localmente dopo https://issues.apache.org/jira/browse/BEAM-5440

Argomenti relativi alla pipeline di travi

Diversi componenti TFX si affidano a Beam per l'elaborazione distribuita dei dati. Sono configurati con beam_pipeline_args , che viene specificato durante la creazione della pipeline:

my_pipeline = Pipeline(
    ...,
    beam_pipeline_args=[...])

TFX 0.30 e versioni successive aggiungono un'interfaccia, with_beam_pipeline_args , per estendere gli argomenti della trave a livello della pipeline per componente:

example_gen = CsvExampleGen(input_base=data_root).with_beam_pipeline_args([...])