TFX を使用すると、次の目的で機械学習 (ML) ワークフローをパイプラインとして簡単に調整できます。
- ML プロセスを自動化すると、モデルを定期的に再トレーニング、評価、デプロイできるようになります。
- モデルのパフォーマンスの詳細な分析と、新しくトレーニングされたモデルの検証を含む ML パイプラインを作成して、パフォーマンスと信頼性を確保します。
- トレーニング データの異常を監視し、トレーニングとサービスの偏りを排除します
- さまざまなハイパーパラメーターのセットを使用してパイプラインを実行することで、実験の速度が向上します。
一般的なパイプライン開発プロセスは、運用環境にデプロイされる前に、ローカル マシンでデータ分析とコンポーネントのセットアップから始まります。このガイドでは、パイプラインをローカルに構築する 2 つの方法について説明します。
- ML ワークフローのニーズに合わせて TFX パイプライン テンプレートをカスタマイズします。 TFX パイプライン テンプレートは、TFX 標準コンポーネントを使用したベスト プラクティスを示す事前構築されたワークフローです。
- TFX を使用してパイプラインを構築します。この使用例では、テンプレートから開始せずにパイプラインを定義します。
パイプラインを開発しているときは、 LocalDagRunner
を使用してパイプラインを実行できます。次に、パイプライン コンポーネントが適切に定義され、テストされたら、Kubeflow や Airflow などの実稼働グレードのオーケストレーターを使用します。
始める前に
TFX は Python パッケージであるため、仮想環境や Docker コンテナなどの Python 開発環境をセットアップする必要があります。それから:
pip install tfx
TFX パイプラインを初めて使用する場合は、続行する前にTFX パイプラインの中心的な概念について詳しく学習してください。
テンプレートを使用してパイプラインを構築する
TFX パイプライン テンプレートを使用すると、ユースケースに合わせてカスタマイズできる、事前に構築されたパイプライン定義のセットが提供されるため、パイプライン開発を簡単に開始できます。
次のセクションでは、テンプレートのコピーを作成し、ニーズに合わせてカスタマイズする方法について説明します。
パイプライン テンプレートのコピーを作成する
利用可能な TFX パイプライン テンプレートのリストを参照してください。
tfx template list
リストからテンプレートを選択します
tfx template copy --model=template --pipeline_name=pipeline-name \ --destination_path=destination-path
以下を置き換えます。
- template : コピーするテンプレートの名前。
- pipeline-name : 作成するパイプラインの名前。
- destination-path : テンプレートのコピー先のパス。
tfx template copy
コマンドについて詳しくは、こちらをご覧ください。指定したパスにパイプライン テンプレートのコピーが作成されました。
パイプライン テンプレートを探索する
このセクションでは、テンプレートによって作成されるスキャフォールディングの概要を説明します。
パイプラインのルート ディレクトリにコピーされたディレクトリとファイルを探索します。
- パイプラインディレクトリ
pipeline.py
- パイプラインを定義し、使用されているコンポーネントをリストします。-
configs.py
- データの取得元や使用されているオーケストレーターなどの構成の詳細を保持します。
- データディレクトリ
- これには通常、
ExampleGen
のデフォルトのソースであるdata.csv
ファイルが含まれています。configs.py
でデータ ソースを変更できます。
- これには通常、
前処理コードとモデル実装を含むモデルディレクトリ
テンプレートは、ローカル環境と Kubeflow の DAG ランナーをコピーします。
一部のテンプレートには Python ノートブックも含まれているため、Machine Learning MetaData を使用してデータとアーティファクトを探索できます。
- パイプラインディレクトリ
パイプライン ディレクトリで次のコマンドを実行します。
tfx pipeline create --pipeline_path local_runner.py
tfx run create --pipeline_name pipeline_name
このコマンドは
LocalDagRunner
を使用してパイプライン実行を作成し、次のディレクトリをパイプラインに追加します。- ローカルで使用される ML メタデータ ストアを含むtfx_metadataディレクトリ。
- パイプラインのファイル出力を含むtfx_pipeline_outputディレクトリ。
パイプラインの
pipeline/configs.py
ファイルを開いて、内容を確認します。このスクリプトは、パイプラインおよびコンポーネント関数で使用される構成オプションを定義します。ここで、データソースの場所や実行内のトレーニング ステップ数などを指定します。パイプラインの
pipeline/pipeline.py
ファイルを開いて、内容を確認します。このスクリプトは TFX パイプラインを作成します。最初、パイプラインにはExampleGen
コンポーネントのみが含まれています。-
pipeline.py
のTODOコメントの指示に従って、パイプラインにさらにステップを追加します。
-
local_runner.py
ファイルを開いて内容を確認します。このスクリプトはパイプライン実行を作成し、data_path
やpreprocessing_fn
などの実行パラメータを指定します。テンプレートによって作成されたスキャフォールディングを確認し、
LocalDagRunner
を使用してパイプライン実行を作成しました。次に、要件に合わせてテンプレートをカスタマイズします。
パイプラインをカスタマイズする
このセクションでは、テンプレートのカスタマイズを開始する方法の概要を説明します。
パイプラインを設計します。テンプレートが提供するスキャフォールディングは、TFX 標準コンポーネントを使用して表形式データのパイプラインを実装するのに役立ちます。既存の ML ワークフローをパイプラインに移行する場合は、 TFX 標準コンポーネントを最大限に活用するようにコードを修正する必要がある場合があります。また、ワークフローに固有の機能や、TFX 標準コンポーネントではまだサポートされていない機能を実装するカスタム コンポーネントを作成する必要がある場合もあります。
パイプラインを設計したら、次のプロセスを使用してパイプラインを繰り返しカスタマイズします。パイプラインにデータを取り込むコンポーネントから開始します。これは通常、
ExampleGen
コンポーネントです。ユースケースに合わせてパイプラインまたはコンポーネントをカスタマイズします。これらのカスタマイズには次のような変更が含まれる場合があります。
- パイプラインパラメータの変更。
- パイプラインへのコンポーネントの追加または削除。
- データ入力ソースを置き換えます。このデータ ソースは、ファイルまたは BigQuery などのサービスへのクエリのいずれかになります。
- パイプライン内のコンポーネントの構成を変更します。
- コンポーネントのカスタマイズ機能を変更します。
local_runner.py
スクリプトを使用してコンポーネントをローカルで実行するか、別のオーケストレーターを使用している場合は別の適切な DAG ランナーを使用します。スクリプトが失敗した場合は、失敗をデバッグし、スクリプトの実行を再試行します。このカスタマイズが機能したら、次のカスタマイズに進みます。
繰り返し作業することで、ニーズに合わせてテンプレート ワークフローの各ステップをカスタマイズできます。
カスタムパイプラインを構築する
テンプレートを使用せずにカスタム パイプラインを構築する方法について詳しくは、次の手順を使用してください。
パイプラインを設計します。 TFX 標準コンポーネントは、完全な ML ワークフローの実装に役立つ実証済みの機能を提供します。既存の ML ワークフローをパイプラインに移行する場合は、TFX 標準コンポーネントを最大限に活用するためにコードを修正する必要がある場合があります。データ拡張などの機能を実装するカスタム コンポーネントの作成が必要になる場合もあります。
- 標準の TFX コンポーネントについて詳しくは、こちらをご覧ください。
- カスタム コンポーネントの詳細については、こちらをご覧ください。
次の例を使用して、パイプラインを定義するスクリプト ファイルを作成します。このガイドでは、このファイルを
my_pipeline.py
と呼びます。import os from typing import Optional, Text, List from absl import logging from ml_metadata.proto import metadata_store_pb2 import tfx.v1 as tfx PIPELINE_NAME = 'my_pipeline' PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output') METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db') ENABLE_CACHE = True def create_pipeline( pipeline_name: Text, pipeline_root:Text, enable_cache: bool, metadata_connection_config: Optional[ metadata_store_pb2.ConnectionConfig] = None, beam_pipeline_args: Optional[List[Text]] = None ): components = [] return tfx.dsl.Pipeline( pipeline_name=pipeline_name, pipeline_root=pipeline_root, components=components, enable_cache=enable_cache, metadata_connection_config=metadata_connection_config, beam_pipeline_args=beam_pipeline_args, <!-- needed? --> ) def run_pipeline(): my_pipeline = create_pipeline( pipeline_name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT, enable_cache=ENABLE_CACHE, metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH) ) tfx.orchestration.LocalDagRunner().run(my_pipeline) if __name__ == '__main__': logging.set_verbosity(logging.INFO) run_pipeline()
次の手順では、
create_pipeline
でパイプラインを定義し、ローカル ランナーを使用してパイプラインをローカルで実行します。次のプロセスを使用してパイプラインを繰り返し構築します。
ユースケースに合わせてパイプラインまたはコンポーネントをカスタマイズします。これらのカスタマイズには、次のような変更が含まれる場合があります。
- パイプラインパラメータの変更。
- パイプラインへのコンポーネントの追加または削除。
- データ入力ファイルを置き換えます。
- パイプライン内のコンポーネントの構成を変更します。
- コンポーネントのカスタマイズ機能を変更します。
ローカル ランナーを使用するか、スクリプトを直接実行して、コンポーネントをローカルで実行します。スクリプトが失敗した場合は、失敗をデバッグし、スクリプトの実行を再試行します。
このカスタマイズが機能したら、次のカスタマイズに進みます。
パイプラインのワークフローの最初のノードから開始します。通常、最初のノードがパイプラインにデータを取り込みます。
ワークフローの最初のノードをパイプラインに追加します。この例では、パイプラインは
ExampleGen
標準コンポーネントを使用して、./data
のディレクトリから CSV をロードします。from tfx.components import CsvExampleGen DATA_PATH = os.path.join('.', 'data') def create_pipeline( pipeline_name: Text, pipeline_root:Text, data_path: Text, enable_cache: bool, metadata_connection_config: Optional[ metadata_store_pb2.ConnectionConfig] = None, beam_pipeline_args: Optional[List[Text]] = None ): components = [] example_gen = tfx.components.CsvExampleGen(input_base=data_path) components.append(example_gen) return tfx.dsl.Pipeline( pipeline_name=pipeline_name, pipeline_root=pipeline_root, components=components, enable_cache=enable_cache, metadata_connection_config=metadata_connection_config, beam_pipeline_args=beam_pipeline_args, <!-- needed? --> ) def run_pipeline(): my_pipeline = create_pipeline( pipeline_name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT, data_path=DATA_PATH, enable_cache=ENABLE_CACHE, metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH) ) tfx.orchestration.LocalDagRunner().run(my_pipeline)
CsvExampleGen
指定されたデータ パスにある CSV 内のデータを使用して、シリアル化されたサンプル レコードを作成します。CsvExampleGen
コンポーネントのinput_base
パラメーターをデータ ルートで設定します。my_pipeline.py
と同じディレクトリにdata
ディレクトリを作成します。小さな CSV ファイルをdata
ディレクトリに追加します。次のコマンドを使用して、
my_pipeline.py
スクリプトを実行します。python my_pipeline.py
結果は次のようになります。
INFO:absl:Component CsvExampleGen depends on []. INFO:absl:Component CsvExampleGen is scheduled. INFO:absl:Component CsvExampleGen is running. INFO:absl:Running driver for CsvExampleGen INFO:absl:MetadataStore with DB connection initialized INFO:absl:Running executor for CsvExampleGen INFO:absl:Generating examples. INFO:absl:Using 1 process(es) for Local pipeline execution. INFO:absl:Processing input csv data ./data/* to TFExample. WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Running publisher for CsvExampleGen INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished.
コンポーネントをパイプラインに繰り返し追加し続けます。