Alcuni set di dati sono troppo grandi per essere elaborati su un singolo computer. tfds
supporta la generazione di dati su più macchine utilizzando Apache Beam .
Questo documento ha due sezioni:
- Per gli utenti che desiderano generare un set di dati Beam esistente
- Per gli sviluppatori che desiderano creare un nuovo set di dati Beam
Generazione di un set di dati Beam
Di seguito sono riportati diversi esempi di generazione di un set di dati Beam, sia sul cloud che localmente.
Su Google Cloud Dataflow
Per eseguire la pipeline utilizzando Google Cloud Dataflow e sfruttare i vantaggi del calcolo distribuito, segui innanzitutto le istruzioni di avvio rapido .
Una volta configurato l'ambiente, puoi eseguire la CLI tfds build
utilizzando una directory di dati su GCS e specificando le opzioni richieste per il flag --beam_pipeline_options
.
Per semplificare l'avvio dello script, è utile definire le seguenti variabili utilizzando i valori effettivi per la configurazione GCP/GCS e il set di dati che desideri generare:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
Dovrai quindi creare un file per dire a Dataflow di installare tfds
sui lavoratori:
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
Se stai utilizzando tfds-nightly
, assicurati di fare eco da tfds-nightly
nel caso in cui il set di dati sia stato aggiornato dall'ultima versione.
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
Se utilizzi dipendenze aggiuntive non incluse nella libreria TFDS, segui le istruzioni per la gestione delle dipendenze della pipeline Python .
Infine, puoi avviare il lavoro utilizzando il comando seguente:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
A livello locale
Per eseguire lo script localmente utilizzando il runner Apache Beam predefinito (deve contenere tutti i dati in memoria), il comando è lo stesso utilizzato per gli altri set di dati:
tfds build my_dataset
Con Apache Flink
Per eseguire la pipeline utilizzando Apache Flink è possibile leggere la documentazione ufficiale . Assicurati che Beam sia compatibile con la compatibilità della versione Flink
Per facilitare l'avvio dello script, è utile definire le seguenti variabili utilizzando i valori effettivi per la configurazione di Flink e il set di dati che desideri generare:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
Per eseguire su un cluster Flink incorporato, puoi avviare il lavoro utilizzando il comando seguente:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
Con uno script personalizzato
Per generare il set di dati su Beam, l'API è la stessa degli altri set di dati. È possibile personalizzare beam.Pipeline
utilizzando gli argomenti beam_options
(e beam_runner
) di DownloadConfig
.
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
Implementazione di un set di dati Beam
Prerequisiti
Per scrivere set di dati Apache Beam, è necessario avere familiarità con i seguenti concetti:
- Acquisisci familiarità con la guida alla creazione del set di dati
tfds
poiché la maggior parte del contenuto si applica ancora ai set di dati Beam. - Ottieni un'introduzione ad Apache Beam con la guida alla programmazione di Beam .
- Se desideri generare il tuo set di dati utilizzando Cloud Dataflow, leggi la documentazione di Google Cloud e la guida alle dipendenze di Apache Beam .
Istruzioni
Se hai familiarità con la guida alla creazione del set di dati , l'aggiunta di un set di dati Beam richiede solo la modifica della funzione _generate_examples
. La funzione dovrebbe restituire un oggetto trave, piuttosto che un generatore:
Set di dati non travi:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
Set di dati del fascio:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
Tutto il resto può essere identico al 100%, compresi i test.
Alcune considerazioni aggiuntive:
- Utilizza
tfds.core.lazy_imports
per importare Apache Beam. Utilizzando una dipendenza lazy, gli utenti possono comunque leggere il set di dati dopo che è stato generato senza dover installare Beam. - Fai attenzione alle chiusure Python. Durante l'esecuzione della pipeline, le funzioni
beam.Map
ebeam.DoFn
vengono serializzate utilizzandopickle
e inviate a tutti i lavoratori. Non utilizzare oggetti mutabili all'interno di unabeam.PTransform
se lo stato deve essere condiviso tra i lavoratori. - A causa del modo in cui
tfds.core.DatasetBuilder
viene serializzato con pickle, la modifica ditfds.core.DatasetBuilder
durante la creazione dei dati verrà ignorata sui lavoratori (ad esempio, non è possibile impostareself.info.metadata['offset'] = 123
in_split_generators
e accedervi dai lavoratori comebeam.Map(lambda x: x + self.info.metadata['offset'])
) - Se è necessario condividere alcuni passaggi della pipeline tra le divisioni, è possibile aggiungere una
pipeline: beam.Pipeline
kwarg a_split_generator
e controllare la pipeline di generazione completa. Consulta la documentazione_generate_examples
ditfds.core.GeneratorBasedBuilder
.
Esempio
Ecco un esempio di un set di dati Beam.
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
Gestire la tua pipeline
Per eseguire la pipeline, dai un'occhiata alla sezione precedente.
tfds build my_dataset --register_checksums
Pipeline che utilizza TFDS come input
Se desideri creare una pipeline di travi che prenda un set di dati TFDS come origine, puoi utilizzare tfds.beam.ReadFromTFDS
:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
Elaborerà ogni frammento del set di dati in parallelo.