Niektóre zbiory danych są zbyt duże, aby można je było przetwarzać na jednej maszynie. tfds
obsługuje generowanie danych na wielu komputerach przy użyciu Apache Beam .
Ten dokument składa się z dwóch sekcji:
- Dla użytkownika, który chce wygenerować istniejący zbiór danych Beam
- Dla programistów, którzy chcą utworzyć nowy zestaw danych Beam
Generowanie zestawu danych Beam
Poniżej znajdują się różne przykłady generowania zestawu danych Beam, zarówno w chmurze, jak i lokalnie.
W Google Cloud Dataflow
Aby uruchomić potok przy użyciu Google Cloud Dataflow i skorzystać z obliczeń rozproszonych, najpierw postępuj zgodnie z instrukcjami szybkiego startu .
Po skonfigurowaniu środowiska możesz uruchomić interfejs CLI tfds build
, korzystając z katalogu danych w GCS i określając wymagane opcje dla flagi --beam_pipeline_options
.
Aby ułatwić uruchomienie skryptu, warto zdefiniować następujące zmienne, korzystając z rzeczywistych wartości konfiguracji GCP/GCS i zbioru danych, który chcesz wygenerować:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
Następnie będziesz musiał utworzyć plik, aby poinformować Dataflow o zainstalowaniu tfds
na pracownikach:
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
Jeśli używasz tfds-nightly
, pamiętaj o powtórzeniu polecenia tfds-nightly
na wypadek, gdyby zbiór danych został zaktualizowany od ostatniej wersji.
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
Jeśli używasz dodatkowych zależności, których nie ma w bibliotece TFDS, postępuj zgodnie z instrukcjami zarządzania zależnościami potoków Pythona .
Na koniec możesz uruchomić zadanie za pomocą poniższego polecenia:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
Lokalnie
Aby uruchomić skrypt lokalnie przy użyciu domyślnego modułu uruchamiającego Apache Beam (musi zmieścić wszystkie dane w pamięci), polecenie jest takie samo jak w przypadku innych zestawów danych:
tfds build my_dataset
Z Apache Flink
Aby uruchomić potok za pomocą Apache Flink, możesz przeczytać oficjalną dokumentację . Upewnij się, że Twój Beam jest zgodny z kompatybilnością wersji Flink
Aby ułatwić uruchomienie skryptu, pomocne jest zdefiniowanie następujących zmiennych przy użyciu rzeczywistych wartości konfiguracji Flink i zestawu danych, który chcesz wygenerować:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
Aby uruchomić zadanie na osadzonym klastrze Flink, możesz uruchomić zadanie za pomocą poniższego polecenia:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
Z niestandardowym skryptem
Aby wygenerować zbiór danych w Beam, API jest takie samo jak w przypadku innych zbiorów danych. Możesz dostosować beam.Pipeline
za pomocą argumentów beam_options
(i beam_runner
) DownloadConfig
.
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
Implementacja zbioru danych Beam
Warunki wstępne
Aby pisać zbiory danych Apache Beam, powinieneś znać następujące pojęcia:
- Zapoznaj się z przewodnikiem tworzenia zestawu danych
tfds
, ponieważ większość treści nadal dotyczy zestawów danych Beam. - Zapoznaj się z wprowadzeniem do Apache Beam dzięki przewodnikowi programowania Beam .
- Jeśli chcesz wygenerować swój zbiór danych za pomocą Cloud Dataflow, przeczytaj dokumentację Google Cloud i przewodnik po zależnościach Apache Beam .
Instrukcje
Jeśli znasz przewodnik tworzenia zestawu danych , dodanie zestawu danych Beam wymaga jedynie zmodyfikowania funkcji _generate_examples
. Funkcja powinna zwrócić obiekt belki, a nie generator:
Zbiór danych inny niż wiązka:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
Zbiór danych belki:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
Cała reszta może być w 100% identyczna, łącznie z testami.
Kilka dodatkowych uwag:
- Użyj
tfds.core.lazy_imports
, aby zaimportować Apache Beam. Korzystając z leniwej zależności, użytkownicy mogą nadal czytać zbiór danych po jego wygenerowaniu, bez konieczności instalowania Beama. - Bądź ostrożny z zamknięciami Pythona. Podczas uruchamiania potoku funkcje
beam.Map
ibeam.DoFn
są serializowane za pomocąpickle
i wysyłane do wszystkich procesów roboczych. Nie używaj zmiennych obiektów wewnątrzbeam.PTransform
, jeśli stan ma być współdzielony między pracownikami. - Ze względu na sposób serializacji
tfds.core.DatasetBuilder
za pomocą pickle, mutowanietfds.core.DatasetBuilder
podczas tworzenia danych zostanie zignorowane na pracownikach (np. nie jest możliwe ustawienieself.info.metadata['offset'] = 123
w_split_generators
i uzyskaj do niego dostęp z poziomu pracowników, takich jakbeam.Map(lambda x: x + self.info.metadata['offset'])
) - Jeśli chcesz udostępnić niektóre kroki potoku pomiędzy podziałami, możesz dodać dodatkowy
pipeline: beam.Pipeline
kwarg do_split_generator
i kontrolować potok pełnej generacji. Zobacz dokumentację_generate_examples
tfds.core.GeneratorBasedBuilder
.
Przykład
Oto przykład zestawu danych Beam.
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
Uruchamianie rurociągu
Aby uruchomić potok, zapoznaj się z powyższą sekcją.
tfds build my_dataset --register_checksums
Potok wykorzystujący TFDS jako dane wejściowe
Jeśli chcesz utworzyć potok wiązki, który jako źródło przyjmuje zbiór danych TFDS, możesz użyć tfds.beam.ReadFromTFDS
:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
Będzie przetwarzać każdy fragment zbioru danych równolegle.