Generazione di grandi set di dati con Apache Beam

Alcuni set di dati sono troppo grandi per essere elaborati su un singolo computer. tfds supporta la generazione di dati su più macchine utilizzando Apache Beam .

Questo documento ha due sezioni:

  • Per gli utenti che desiderano generare un set di dati Beam esistente
  • Per gli sviluppatori che desiderano creare un nuovo set di dati Beam

Generazione di un set di dati Beam

Di seguito sono riportati diversi esempi di generazione di un set di dati Beam, sia sul cloud che localmente.

Su Google Cloud Dataflow

Per eseguire la pipeline utilizzando Google Cloud Dataflow e sfruttare i vantaggi del calcolo distribuito, segui innanzitutto le istruzioni di avvio rapido .

Una volta configurato l'ambiente, puoi eseguire la CLI tfds build utilizzando una directory di dati su GCS e specificando le opzioni richieste per il flag --beam_pipeline_options .

Per semplificare l'avvio dello script, è utile definire le seguenti variabili utilizzando i valori effettivi per la configurazione GCP/GCS e il set di dati che desideri generare:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

Dovrai quindi creare un file per dire a Dataflow di installare tfds sui lavoratori:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

Se stai utilizzando tfds-nightly , assicurati di fare eco da tfds-nightly nel caso in cui il set di dati sia stato aggiornato dall'ultima versione.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

Se utilizzi dipendenze aggiuntive non incluse nella libreria TFDS, segui le istruzioni per la gestione delle dipendenze della pipeline Python .

Infine, puoi avviare il lavoro utilizzando il comando seguente:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

A livello locale

Per eseguire lo script localmente utilizzando il runner Apache Beam predefinito (deve contenere tutti i dati in memoria), il comando è lo stesso utilizzato per gli altri set di dati:

tfds build my_dataset

Per eseguire la pipeline utilizzando Apache Flink è possibile leggere la documentazione ufficiale . Assicurati che Beam sia compatibile con la compatibilità della versione Flink

Per facilitare l'avvio dello script, è utile definire le seguenti variabili utilizzando i valori effettivi per la configurazione di Flink e il set di dati che desideri generare:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

Per eseguire su un cluster Flink incorporato, puoi avviare il lavoro utilizzando il comando seguente:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

Con uno script personalizzato

Per generare il set di dati su Beam, l'API è la stessa degli altri set di dati. È possibile personalizzare beam.Pipeline utilizzando gli argomenti beam_options (e beam_runner ) di DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Implementazione di un set di dati Beam

Prerequisiti

Per scrivere set di dati Apache Beam, è necessario avere familiarità con i seguenti concetti:

Istruzioni

Se hai familiarità con la guida alla creazione del set di dati , l'aggiunta di un set di dati Beam richiede solo la modifica della funzione _generate_examples . La funzione dovrebbe restituire un oggetto trave, piuttosto che un generatore:

Set di dati non travi:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Set di dati del fascio:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Tutto il resto può essere identico al 100%, compresi i test.

Alcune considerazioni aggiuntive:

  • Utilizza tfds.core.lazy_imports per importare Apache Beam. Utilizzando una dipendenza lazy, gli utenti possono comunque leggere il set di dati dopo che è stato generato senza dover installare Beam.
  • Fai attenzione alle chiusure Python. Durante l'esecuzione della pipeline, le funzioni beam.Map e beam.DoFn vengono serializzate utilizzando pickle e inviate a tutti i lavoratori. Non utilizzare oggetti mutabili all'interno di una beam.PTransform se lo stato deve essere condiviso tra i lavoratori.
  • A causa del modo in cui tfds.core.DatasetBuilder viene serializzato con pickle, la modifica di tfds.core.DatasetBuilder durante la creazione dei dati verrà ignorata sui lavoratori (ad esempio, non è possibile impostare self.info.metadata['offset'] = 123 in _split_generators e accedervi dai lavoratori come beam.Map(lambda x: x + self.info.metadata['offset']) )
  • Se è necessario condividere alcuni passaggi della pipeline tra le divisioni, è possibile aggiungere una pipeline: beam.Pipeline kwarg a _split_generator e controllare la pipeline di generazione completa. Consulta la documentazione _generate_examples di tfds.core.GeneratorBasedBuilder .

Esempio

Ecco un esempio di un set di dati Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

Gestire la tua pipeline

Per eseguire la pipeline, dai un'occhiata alla sezione precedente.

tfds build my_dataset --register_checksums

Pipeline che utilizza TFDS come input

Se desideri creare una pipeline di travi che prenda un set di dati TFDS come origine, puoi utilizzare tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

Elaborerà ogni frammento del set di dati in parallelo.