Generando grandes conjuntos de datos con Apache Beam

Algunos conjuntos de datos son demasiado grandes para procesarlos en una sola máquina. tfds admite la generación de datos en muchas máquinas mediante Apache Beam .

Este documento tiene dos secciones:

  • Para usuarios que desean generar un conjunto de datos Beam existente
  • Para desarrolladores que quieran crear un nuevo conjunto de datos Beam

Generando un conjunto de datos Beam

A continuación se muestran diferentes ejemplos de generación de un conjunto de datos Beam, tanto en la nube como localmente.

En el flujo de datos de Google Cloud

Para ejecutar la canalización mediante Google Cloud Dataflow y aprovechar la computación distribuida, primero siga las instrucciones de inicio rápido .

Una vez que su entorno esté configurado, puede ejecutar la CLI tfds build usando un directorio de datos en GCS y especificando las opciones requeridas para el indicador --beam_pipeline_options .

Para facilitar el inicio del script, es útil definir las siguientes variables utilizando los valores reales para su configuración de GCP/GCS y el conjunto de datos que desea generar:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

Luego deberá crear un archivo para indicarle a Dataflow que instale tfds en los trabajadores:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

Si está utilizando tfds-nightly , asegúrese de hacer eco desde tfds-nightly en caso de que el conjunto de datos se haya actualizado desde la última versión.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

Si está utilizando dependencias adicionales no incluidas en la biblioteca TFDS, siga las instrucciones para administrar las dependencias de la canalización de Python .

Finalmente, puede iniciar el trabajo usando el siguiente comando:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

En la zona

Para ejecutar su script localmente usando el ejecutor Apache Beam predeterminado (debe caber todos los datos en la memoria), el comando es el mismo que para otros conjuntos de datos:

tfds build my_dataset

Para ejecutar la canalización utilizando Apache Flink, puede leer la documentación oficial . Asegúrese de que su Beam cumpla con la compatibilidad de versiones de Flink

Para que sea más fácil iniciar el script, es útil definir las siguientes variables utilizando los valores reales para su configuración de Flink y el conjunto de datos que desea generar:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

Para ejecutarlo en un clúster de Flink integrado, puede iniciar el trabajo usando el siguiente comando:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

Con un script personalizado

Para generar el conjunto de datos en Beam, la API es la misma que para otros conjuntos de datos. Puede personalizar beam.Pipeline utilizando los argumentos beam_options (y beam_runner ) de DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Implementación de un conjunto de datos Beam

Requisitos previos

Para escribir conjuntos de datos de Apache Beam, debe estar familiarizado con los siguientes conceptos:

Instrucciones

Si está familiarizado con la guía de creación de conjuntos de datos , agregar un conjunto de datos Beam solo requiere modificar la función _generate_examples . La función debería devolver un objeto de viga, en lugar de un generador:

Conjunto de datos sin haz:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Conjunto de datos de haz:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Todo lo demás puede ser 100% idéntico, incluidas las pruebas.

Algunas consideraciones adicionales:

  • Utilice tfds.core.lazy_imports para importar Apache Beam. Al utilizar una dependencia diferida, los usuarios aún pueden leer el conjunto de datos una vez generado sin tener que instalar Beam.
  • Tenga cuidado con los cierres de Python. Al ejecutar la tubería, las funciones beam.Map y beam.DoFn se serializan mediante pickle y se envían a todos los trabajadores. No utilice objetos mutables dentro de una beam.PTransform si el estado debe compartirse entre los trabajadores.
  • Debido a la forma en que tfds.core.DatasetBuilder se serializa con pickle, los trabajadores ignorarán la mutación tfds.core.DatasetBuilder durante la creación de datos (por ejemplo, no es posible establecer self.info.metadata['offset'] = 123 en _split_generators y acceda a él desde los trabajadores como beam.Map(lambda x: x + self.info.metadata['offset']) )
  • Si necesita compartir algunos pasos de la canalización entre las divisiones, puede agregar una pipeline: beam.Pipeline kwarg a _split_generator y controlar la canalización de generación completa. Consulte la documentación _generate_examples de tfds.core.GeneratorBasedBuilder .

Ejemplo

A continuación se muestra un ejemplo de un conjunto de datos Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

Ejecutando su canalización

Para ejecutar la tubería, eche un vistazo a la sección anterior.

tfds build my_dataset --register_checksums

Tubería que utiliza TFDS como entrada

Si desea crear una canalización de vigas que tome un conjunto de datos TFDS como fuente, puede usar tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

Procesará cada fragmento del conjunto de datos en paralelo.