データセットによっては、1 台のマシンで処理するには大きすぎるものがあります。tfds
は、Apache Beam を使用することによって、多くのマシンにまたがったデータ生成のサポートをします。
このドキュメントには、2 つのセクションがあります。
- 既存の Beam のデータセットを生成するユーザー向け
- 新規の Beam のデータセットを作成する開発者向け
Beam のデータセットを生成する
クラウドまたはローカルで Beam のデータセットを生成するさまざまな例を以下に紹介します。
警告: tfds build
CLI でデータセットを生成する際には、生成するデータセットの構成を必ず指定してください。指定しない場合、すべての既存の構成が生成されてしまいます。たとえばウィキペディアの場合は、tfds build wikipedia
の代わりに tfds build wikipedia/20200301.en
を使用します。
Google Cloud Dataflow で生成する
Google Cloud Dataflow を使用してパイプラインを実行し、分散計算を活用するには、まずクイックスタートの手順に従います。
環境をセットアップしたら、tfds build
CLI を実行できます。これには、GCS のデータディレクトリを使用し、必要なオプションを --beam_pipeline_options
フラグに指定します。
スクリプトの起動を容易にするためには、GCP/GCS セットアップの実際の値と生成するデータセットを使用して、以下の変数を定義すると便利です。
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
次に、ワーカーに tfds
をインストールするよう Dataflow に指示をするファイルを作成する必要があります。
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
tfds-nightly
を使用している場合には、データセットが前回のリリースから更新されている場合に備え、tfds-nightly
からエコーするようにします。
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
最後に、以下のコマンドを使用してジョブを起動します。
python -m tensorflow_datasets.scripts.download_and_prepare \
--datasets=$DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
ローカルで生成する
デフォルトの Apache Beam ランナーを使用してローカルでスクリプトを実行する場合(すべてのデータがメモリに収まる必要があります)のコマンドは、他のデータセットと同じです。
tfds build my_dataset
警告: Beam のデータセットは巨大な(テラバイト以上)場合があり、生成には相当量のリソースを必要とします(ローカルコンピュータでは数週間かかることもあります)。データセットの生成には分散環境の使用を推奨しています。サポートされているランタイムのリストについては Apache Beam ドキュメントを参照してください。
Apache Flink を使用する
Apache Flink を使用してパイプラインを実行する場合は、公式ドキュメントをお読みください。Beam が Flink のバージョン互換性に準拠していることを確認してください。
スクリプトの起動を容易にするには、Flink セットアップと生成するデータセットの実際の値を使って、以下の変数を定義すると便利です。
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
組み込みの Flink クラスタで実行するには、以下のコマンドを使用してジョブを起動できます。
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
カスタムスクリプト内で生成する
Beam でデータセットを生成する場合、API は他のデータセットの場合と同じですが、beam.Pipeline
を、DownloadConfig
の beam_options
(および beam_runner
)引数を使ってカスタマイズできます。
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
Beam のデータセットを実装する
前提条件
Apache Beam のデータセットを書き込むにあたり、以下の概念を理解しておく必要があります。
tfds
データセット作成ガイドをよく読みましょう。ほとんどの内容は Beam データセットにも適用されます。- Beam プログラミングガイドで Apache Beam の概要を把握しましょう。
- Cloud Dataflow を使用してデータセットを生成する場合は、Google Cloud ドキュメント と Apache Beam 依存性ガイドをお読みください。
手順
データセット作成ガイドを理解しているのであれば、Beam データセットの追加には、_generate_examples
関数を変更することだけが必要です。この関数はジェネレータではなく beam オブジェクトを返します。
Beam 以外のデータセット:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
Beam データセット:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
その他すべては、テストも含め、まったく同じになります。
その他の考慮事項 :
- Apache Beam のインポートには、
tfds.core.lazy_imports
を使用します。遅延依存関係を使用すると、ユーザーは Beam をインストールしなくても、生成された後のデータセットを読むことができます。 - Python のクロージャには注意してください。パイプラインを実行する際、
beam.Map
とbeam.DoFn
関数は、pickle
を使ってシリアル化され、すべてのワーカーに送信されます。ワーカー間で状態を共有する必要がある場合は、beam.PTransform
内でオブジェクトをミュータブルにしないでください。 tfds.core.DatasetBuilder
が pickle でシリアル化される方法により、データ作成中、ワーカーでのtfds.core.DatasetBuilder
のミュート化は無視されます(_split_generators
でself.info.metadata['offset'] = 123
を設定し、beam.Map(lambda x: x + self.info.metadata['offset'])
のようにしてワーカーからそれにアクセスすることはできません)。- Split 間で一部のパイプラインステップを共有する櫃夜ぐあある場合は、追加の
pipeline: beam.Pipeline
kwarg を_split_generator
に追加して、生成パイプライン全体を制御することができます。tfds.core.GeneratorBasedBuilder
の_generate_examples
ドキュメントをご覧ください。
例
Beam データセットの例を以下に示します。
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
パイプラインの実行
パイプラインの実行には、上記のセクションをご覧ください。
注意: Beam 以外のデータセットと同様に、--register_checksums
でダウンロードチェックサムを必ず登録してください(ダウンロードを初めて登録する場合のみ)。
tfds build my_dataset --register_checksums
TFDS を入力として使用するパイプライン
TFDS データセットをソースとして取る Beam パイプラインを作成する場合は、tfds.beam.ReadFromTFDS
を使用できます。
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
データセットの各シャードを並行して処理します。
注意: これには、データベースがすでに生成されていることが必要です。Beam を使ってデータセットを生成するには、ほかのセクションをご覧ください。