Algunos conjuntos de datos son demasiado grandes para ser procesados en una sola máquina. tfds
admite la generación de datos en muchas máquinas mediante el uso de Apache Beam .
Este documento tiene dos secciones:
- Para usuarios que desean generar un conjunto de datos Beam existente
- Para desarrolladores que desean crear un nuevo conjunto de datos de Beam
Generación de un conjunto de datos de haz
A continuación se muestran diferentes ejemplos de cómo generar un conjunto de datos Beam, tanto en la nube como localmente.
En el flujo de datos de la nube de Google
Para ejecutar la canalización con Google Cloud Dataflow y aprovechar el cómputo distribuido, primero siga las instrucciones de inicio rápido .
Una vez que su entorno esté configurado, puede ejecutar la CLI tfds build
usando un directorio de datos en GCS y especificando las opciones requeridas para el indicador --beam_pipeline_options
.
Para que sea más fácil iniciar la secuencia de comandos, es útil definir las siguientes variables usando los valores reales para su configuración de GCP/GCS y el conjunto de datos que desea generar:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
Luego deberá crear un archivo para decirle a Dataflow que instale tfds
en los trabajadores:
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
Si usa tfds-nightly
, asegúrese de repetir desde tfds-nightly
en caso de que el conjunto de datos se haya actualizado desde la última versión.
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
Finalmente, puede iniciar el trabajo usando el siguiente comando:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
En la zona
Para ejecutar su secuencia de comandos localmente utilizando el ejecutor predeterminado de Apache Beam (debe caber todos los datos en la memoria), el comando es el mismo que para otros conjuntos de datos:
tfds build my_dataset
Con Apache Flink
Para ejecutar la canalización con Apache Flink, puede leer la documentación oficial . Asegúrese de que su Beam cumpla con la compatibilidad de versiones de Flink
Para que sea más fácil iniciar el script, es útil definir las siguientes variables utilizando los valores reales para su configuración de Flink y el conjunto de datos que desea generar:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
Para ejecutar en un clúster de Flink incorporado, puede iniciar el trabajo con el siguiente comando:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
Con un guión personalizado
Para generar el conjunto de datos en Beam, la API es la misma que para otros conjuntos de datos. Puede personalizar beam.Pipeline
usando los argumentos beam_options
(y beam_runner
) de DownloadConfig
.
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
Implementando un conjunto de datos Beam
requisitos previos
Para escribir conjuntos de datos de Apache Beam, debe estar familiarizado con los siguientes conceptos:
- Familiarícese con la guía de creación de conjuntos de datos
tfds
, ya que la mayor parte del contenido aún se aplica a los conjuntos de datos de Beam. - Obtenga una introducción a Apache Beam con la guía de programación de Beam .
- Si desea generar su conjunto de datos con Cloud Dataflow, lea la documentación de Google Cloud y la guía de dependencia de Apache Beam .
Instrucciones
Si está familiarizado con la guía de creación de conjuntos de datos , agregar un conjunto de datos Beam solo requiere modificar la función _generate_examples
. La función debería devolver un objeto de haz, en lugar de un generador:
Conjunto de datos sin haz:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
Conjunto de datos de haz:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
Todo lo demás puede ser 100% idéntico, incluidas las pruebas.
Algunas consideraciones adicionales:
- Utilice
tfds.core.lazy_imports
para importar Apache Beam. Mediante el uso de una dependencia diferida, los usuarios aún pueden leer el conjunto de datos después de que se haya generado sin tener que instalar Beam. - Tenga cuidado con los cierres de Python. Al ejecutar la canalización, las funciones
beam.Map
ybeam.DoFn
se serializan mediantepickle
y se envían a todos los trabajadores. No use objetos mutables dentro de unabeam.PTransform
si el estado debe compartirse entre los trabajadores. - Debido a la forma en que
tfds.core.DatasetBuilder
se serializa con pickle, la mutacióntfds.core.DatasetBuilder
durante la creación de datos se ignorará en los trabajadores (por ejemplo, no es posible configurarself.info.metadata['offset'] = 123
en_split_generators
y acceda a él desde los trabajadores comobeam.Map(lambda x: x + self.info.metadata['offset'])
- Si necesita compartir algunos pasos de canalización entre las divisiones, puede agregar una
pipeline: beam.Pipeline
kwarg to_split_generator
y controlar la canalización de generación completa. Consulte la documentación de_generate_examples
detfds.core.GeneratorBasedBuilder
.
Ejemplo
Este es un ejemplo de un conjunto de datos Beam.
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
Ejecutando su canalización
Para ejecutar la canalización, eche un vistazo a la sección anterior.
tfds build my_dataset --register_checksums
Pipeline usando TFDS como entrada
Si desea crear una tubería de haz que tome un conjunto de datos TFDS como fuente, puede usar tfds.beam.ReadFromTFDS
:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
Procesará cada fragmento del conjunto de datos en paralelo.