Generando grandes conjuntos de datos con Apache Beam

Algunos conjuntos de datos son demasiado grandes para ser procesados ​​en una sola máquina. tfds admite la generación de datos en muchas máquinas mediante el uso de Apache Beam .

Este documento tiene dos secciones:

  • Para usuarios que desean generar un conjunto de datos Beam existente
  • Para desarrolladores que desean crear un nuevo conjunto de datos de Beam

Generación de un conjunto de datos de haz

A continuación se muestran diferentes ejemplos de cómo generar un conjunto de datos Beam, tanto en la nube como localmente.

En el flujo de datos de la nube de Google

Para ejecutar la canalización con Google Cloud Dataflow y aprovechar el cómputo distribuido, primero siga las instrucciones de inicio rápido .

Una vez que su entorno esté configurado, puede ejecutar la CLI tfds build usando un directorio de datos en GCS y especificando las opciones requeridas para el indicador --beam_pipeline_options .

Para que sea más fácil iniciar la secuencia de comandos, es útil definir las siguientes variables usando los valores reales para su configuración de GCP/GCS y el conjunto de datos que desea generar:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

Luego deberá crear un archivo para decirle a Dataflow que instale tfds en los trabajadores:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

Si usa tfds-nightly , asegúrese de repetir desde tfds-nightly en caso de que el conjunto de datos se haya actualizado desde la última versión.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

Finalmente, puede iniciar el trabajo usando el siguiente comando:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

En la zona

Para ejecutar su secuencia de comandos localmente utilizando el ejecutor predeterminado de Apache Beam (debe caber todos los datos en la memoria), el comando es el mismo que para otros conjuntos de datos:

tfds build my_dataset

Para ejecutar la canalización con Apache Flink, puede leer la documentación oficial . Asegúrese de que su Beam cumpla con la compatibilidad de versiones de Flink

Para que sea más fácil iniciar el script, es útil definir las siguientes variables utilizando los valores reales para su configuración de Flink y el conjunto de datos que desea generar:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

Para ejecutar en un clúster de Flink incorporado, puede iniciar el trabajo con el siguiente comando:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

Con un guión personalizado

Para generar el conjunto de datos en Beam, la API es la misma que para otros conjuntos de datos. Puede personalizar beam.Pipeline usando los argumentos beam_options (y beam_runner ) de DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Implementando un conjunto de datos Beam

requisitos previos

Para escribir conjuntos de datos de Apache Beam, debe estar familiarizado con los siguientes conceptos:

Instrucciones

Si está familiarizado con la guía de creación de conjuntos de datos , agregar un conjunto de datos Beam solo requiere modificar la función _generate_examples . La función debería devolver un objeto de haz, en lugar de un generador:

Conjunto de datos sin haz:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Conjunto de datos de haz:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Todo lo demás puede ser 100% idéntico, incluidas las pruebas.

Algunas consideraciones adicionales:

  • Utilice tfds.core.lazy_imports para importar Apache Beam. Mediante el uso de una dependencia diferida, los usuarios aún pueden leer el conjunto de datos después de que se haya generado sin tener que instalar Beam.
  • Tenga cuidado con los cierres de Python. Al ejecutar la canalización, las funciones beam.Map y beam.DoFn se serializan mediante pickle y se envían a todos los trabajadores. No use objetos mutables dentro de una beam.PTransform si el estado debe compartirse entre los trabajadores.
  • Debido a la forma en que tfds.core.DatasetBuilder se serializa con pickle, la mutación tfds.core.DatasetBuilder durante la creación de datos se ignorará en los trabajadores (por ejemplo, no es posible configurar self.info.metadata['offset'] = 123 en _split_generators y acceda a él desde los trabajadores como beam.Map(lambda x: x + self.info.metadata['offset'])
  • Si necesita compartir algunos pasos de canalización entre las divisiones, puede agregar una pipeline: beam.Pipeline kwarg to _split_generator y controlar la canalización de generación completa. Consulte la documentación de _generate_examples de tfds.core.GeneratorBasedBuilder .

Ejemplo

Este es un ejemplo de un conjunto de datos Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

Ejecutando su canalización

Para ejecutar la canalización, eche un vistazo a la sección anterior.

tfds build my_dataset --register_checksums

Pipeline usando TFDS como entrada

Si desea crear una tubería de haz que tome un conjunto de datos TFDS como fuente, puede usar tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

Procesará cada fragmento del conjunto de datos en paralelo.