Algunos conjuntos de datos son demasiado grandes para procesarlos en una sola máquina. tfds
admite la generación de datos en muchas máquinas mediante Apache Beam .
Este documento tiene dos secciones:
- Para usuarios que desean generar un conjunto de datos Beam existente
- Para desarrolladores que quieran crear un nuevo conjunto de datos Beam
Generando un conjunto de datos Beam
A continuación se muestran diferentes ejemplos de generación de un conjunto de datos Beam, tanto en la nube como localmente.
En el flujo de datos de Google Cloud
Para ejecutar la canalización mediante Google Cloud Dataflow y aprovechar la computación distribuida, primero siga las instrucciones de inicio rápido .
Una vez que su entorno esté configurado, puede ejecutar la CLI tfds build
usando un directorio de datos en GCS y especificando las opciones requeridas para el indicador --beam_pipeline_options
.
Para facilitar el inicio del script, es útil definir las siguientes variables utilizando los valores reales para su configuración de GCP/GCS y el conjunto de datos que desea generar:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
Luego deberá crear un archivo para indicarle a Dataflow que instale tfds
en los trabajadores:
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
Si está utilizando tfds-nightly
, asegúrese de hacer eco desde tfds-nightly
en caso de que el conjunto de datos se haya actualizado desde la última versión.
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
Si está utilizando dependencias adicionales no incluidas en la biblioteca TFDS, siga las instrucciones para administrar las dependencias de la canalización de Python .
Finalmente, puede iniciar el trabajo usando el siguiente comando:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
En la zona
Para ejecutar su script localmente usando el ejecutor Apache Beam predeterminado (debe caber todos los datos en la memoria), el comando es el mismo que para otros conjuntos de datos:
tfds build my_dataset
Con Apache Flink
Para ejecutar la canalización utilizando Apache Flink, puede leer la documentación oficial . Asegúrese de que su Beam cumpla con la compatibilidad de versiones de Flink
Para facilitar el inicio del script, es útil definir las siguientes variables utilizando los valores reales para su configuración de Flink y el conjunto de datos que desea generar:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
Para ejecutarlo en un clúster de Flink integrado, puede iniciar el trabajo usando el siguiente comando:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
Con un script personalizado
Para generar el conjunto de datos en Beam, la API es la misma que para otros conjuntos de datos. Puede personalizar beam.Pipeline
utilizando los argumentos beam_options
(y beam_runner
) de DownloadConfig
.
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
Implementación de un conjunto de datos Beam
Requisitos previos
Para escribir conjuntos de datos de Apache Beam, debe estar familiarizado con los siguientes conceptos:
- Familiarícese con la guía de creación de conjuntos de datos
tfds
, ya que la mayor parte del contenido todavía se aplica a los conjuntos de datos Beam. - Obtenga una introducción a Apache Beam con la guía de programación de Beam .
- Si desea generar su conjunto de datos utilizando Cloud Dataflow, lea la documentación de Google Cloud y la guía de dependencia de Apache Beam .
Instrucciones
Si está familiarizado con la guía de creación de conjuntos de datos , agregar un conjunto de datos Beam solo requiere modificar la función _generate_examples
. La función debería devolver un objeto de viga, en lugar de un generador:
Conjunto de datos sin haz:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
Conjunto de datos de haz:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
Todo lo demás puede ser 100% idéntico, incluidas las pruebas.
Algunas consideraciones adicionales:
- Utilice
tfds.core.lazy_imports
para importar Apache Beam. Al utilizar una dependencia diferida, los usuarios aún pueden leer el conjunto de datos una vez generado sin tener que instalar Beam. - Tenga cuidado con los cierres de Python. Al ejecutar la tubería, las funciones
beam.Map
ybeam.DoFn
se serializan mediantepickle
y se envían a todos los trabajadores. No utilice objetos mutables dentro de unabeam.PTransform
si el estado debe compartirse entre los trabajadores. - Debido a la forma en que
tfds.core.DatasetBuilder
se serializa con pickle, los trabajadores ignorarán la mutacióntfds.core.DatasetBuilder
durante la creación de datos (por ejemplo, no es posible establecerself.info.metadata['offset'] = 123
en_split_generators
y acceda a él desde los trabajadores comobeam.Map(lambda x: x + self.info.metadata['offset'])
) - Si necesita compartir algunos pasos de la canalización entre las divisiones, puede agregar una
pipeline: beam.Pipeline
kwarg a_split_generator
y controlar la canalización de generación completa. Consulte la documentación_generate_examples
detfds.core.GeneratorBasedBuilder
.
Ejemplo
A continuación se muestra un ejemplo de un conjunto de datos Beam.
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
Ejecutando su canalización
Para ejecutar la tubería, eche un vistazo a la sección anterior.
tfds build my_dataset --register_checksums
Tubería que utiliza TFDS como entrada
Si desea crear una canalización de vigas que tome un conjunto de datos TFDS como fuente, puede usar tfds.beam.ReadFromTFDS
:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
Procesará cada fragmento del conjunto de datos en paralelo.