Некоторые наборы данных слишком велики для обработки на одном компьютере. tfds
поддерживает генерацию данных на многих машинах с помощью Apache Beam .
Этот документ состоит из двух разделов:
- Для пользователей, которые хотят создать существующий набор данных Beam
- Для разработчиков, которые хотят создать новый набор данных Beam.
Создание набора данных Beam
Ниже приведены различные примеры создания набора данных Beam как в облаке, так и локально.
В потоке данных Google Cloud
Чтобы запустить конвейер с помощью Google Cloud Dataflow и воспользоваться преимуществами распределенных вычислений, сначала следуйте инструкциям по быстрому запуску .
После настройки среды вы можете запустить CLI tfds build
, используя каталог данных в GCS и указав необходимые параметры для флага --beam_pipeline_options
.
Чтобы упростить запуск сценария, полезно определить следующие переменные, используя фактические значения для вашей настройки GCP/GCS и набора данных, который вы хотите сгенерировать:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
Затем вам нужно будет создать файл, чтобы указать Dataflow установить tfds
на рабочих:
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
Если вы используете tfds-nightly
, обязательно используйте эхо-сигнал tfds-nightly
, если набор данных был обновлен с момента последнего выпуска.
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
Если вы используете дополнительные зависимости, не включенные в библиотеку TFDS, следуйте инструкциям по управлению зависимостями конвейера Python .
Наконец, вы можете запустить задание, используя команду ниже:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
Локально
Чтобы запустить скрипт локально с помощью средства запуска Apache Beam по умолчанию (он должен уместить все данные в памяти), команда такая же, как и для других наборов данных:
tfds build my_dataset
С Apache Flink
Чтобы запустить конвейер с помощью Apache Flink, вы можете прочитать официальную документацию . Убедитесь, что ваш Beam соответствует совместимости версий Flink.
Чтобы упростить запуск сценария, полезно определить следующие переменные, используя фактические значения для вашей настройки Flink и набора данных, который вы хотите сгенерировать:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
Чтобы запустить задание во встроенном кластере Flink, вы можете запустить задание с помощью следующей команды:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
С пользовательским скриптом
Для создания набора данных в Beam используется тот же API, что и для других наборов данных. Вы можете настроить beam.Pipeline
используя аргументы beam_options
(и beam_runner
) DownloadConfig
.
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
Реализация набора данных Beam
Предварительные условия
Чтобы писать наборы данных Apache Beam, вы должны быть знакомы со следующими понятиями:
- Ознакомьтесь с руководством по созданию набора данных
tfds
, поскольку большая часть содержимого по-прежнему применима к наборам данных Beam. - Познакомьтесь с Apache Beam с помощью руководства по программированию Beam .
- Если вы хотите создать набор данных с помощью Cloud Dataflow, прочтите документацию Google Cloud и руководство по зависимостям Apache Beam .
Инструкции
Если вы знакомы с руководством по созданию набора данных , для добавления набора данных Beam требуется всего лишь изменить функцию _generate_examples
. Функция должна возвращать объект балки, а не генератор:
Нелучевой набор данных:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
Набор данных луча:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
Все остальное может быть на 100% идентично, включая тесты.
Некоторые дополнительные соображения:
- Используйте
tfds.core.lazy_imports
для импорта Apache Beam. Используя отложенную зависимость, пользователи могут читать набор данных после его создания без необходимости устанавливать Beam. - Будьте осторожны с замыканиями Python. При запуске конвейера функции
beam.Map
иbeam.DoFn
сериализуются с помощьюpickle
и отправляются всем воркерам. Не используйте изменяемые объекты внутриbeam.PTransform
, если состояние необходимо разделить между работниками. - Из-за того, что
tfds.core.DatasetBuilder
сериализуется с помощью Pickle, мутацияtfds.core.DatasetBuilder
во время создания данных будет игнорироваться в рабочих процессах (например, невозможно установитьself.info.metadata['offset'] = 123
в_split_generators
и получить к нему доступ от рабочих, напримерbeam.Map(lambda x: x + self.info.metadata['offset'])
) - Если вам нужно разделить некоторые этапы конвейера между разделениями, вы можете добавить дополнительный
pipeline: beam.Pipeline
kwarg в_split_generator
и управлять полным конвейером генерации. См. документацию_generate_examples
дляtfds.core.GeneratorBasedBuilder
.
Пример
Вот пример набора данных Beam.
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
Запуск вашего конвейера
Чтобы запустить конвейер, ознакомьтесь с приведенным выше разделом.
tfds build my_dataset --register_checksums
Конвейер, использующий TFDS в качестве входных данных
Если вы хотите создать лучевой конвейер, который принимает набор данных TFDS в качестве источника, вы можете использовать tfds.beam.ReadFromTFDS
:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
Он будет обрабатывать каждый фрагмент набора данных параллельно.