TFX パイプラインをローカルで構築する

TFX を使用すると、次の目的で機械学習 (ML) ワークフローをパイプラインとして簡単に調整できます。

  • ML プロセスを自動化すると、モデルを定期的に再トレーニング、評価、デプロイできるようになります。
  • モデルのパフォーマンスの詳細な分析と、新しくトレーニングされたモデルの検証を含む ML パイプラインを作成して、パフォーマンスと信頼性を確保します。
  • トレーニング データの異常を監視し、トレーニングとサービスの偏りを排除します
  • さまざまなハイパーパラメーターのセットを使用してパイプラインを実行することで、実験の速度が向上します。

一般的なパイプライン開発プロセスは、運用環境にデプロイされる前に、ローカル マシンでデータ分析とコンポーネントのセットアップから始まります。このガイドでは、パイプラインをローカルに構築する 2 つの方法について説明します。

  • ML ワークフローのニーズに合わせて TFX パイプライン テンプレートをカスタマイズします。 TFX パイプライン テンプレートは、TFX 標準コンポーネントを使用したベスト プラクティスを示す事前構築されたワークフローです。
  • TFX を使用してパイプラインを構築します。この使用例では、テンプレートから開始せずにパイプラインを定義します。

パイプラインを開発しているときは、 LocalDagRunnerを使用してパイプラインを実行できます。次に、パイプライン コンポーネントが適切に定義され、テストされたら、Kubeflow や Airflow などの実稼働グレードのオーケストレーターを使用します。

始める前に

TFX は Python パッケージであるため、仮想環境や Docker コンテナなどの Python 開発環境をセットアップする必要があります。それから:

pip install tfx

TFX パイプラインを初めて使用する場合は、続行する前にTFX パイプラインの中心的な概念について詳しく学習してください

テンプレートを使用してパイプラインを構築する

TFX パイプライン テンプレートを使用すると、ユースケースに合わせてカスタマイズできる、事前に構築されたパイプライン定義のセットが提供されるため、パイプライン開発を簡単に開始できます。

次のセクションでは、テンプレートのコピーを作成し、ニーズに合わせてカスタマイズする方法について説明します。

パイプライン テンプレートのコピーを作成する

  1. 利用可能な TFX パイプライン テンプレートのリストを参照してください。

    tfx template list
    
  2. リストからテンプレートを選択します

    tfx template copy --model=template --pipeline_name=pipeline-name \
    --destination_path=destination-path
    

    以下を置き換えます。

    • template : コピーするテンプレートの名前。
    • pipeline-name : 作成するパイプラインの名前。
    • destination-path : テンプレートのコピー先のパス。

    tfx template copyコマンドについて詳しくは、こちらをご覧ください。

  3. 指定したパスにパイプライン テンプレートのコピーが作成されました。

パイプライン テンプレートを探索する

このセクションでは、テンプレートによって作成されるスキャフォールディングの概要を説明します。

  1. パイプラインのルート ディレクトリにコピーされたディレクトリとファイルを探索します。

    • パイプラインディレクトリ
      • pipeline.py - パイプラインを定義し、使用されているコンポーネントをリストします。
      • configs.py - データの取得元や使用されているオーケストレーターなどの構成の詳細を保持します。
    • データディレクトリ
      • これには通常、 ExampleGenのデフォルトのソースであるdata.csvファイルが含まれています。 configs.pyでデータ ソースを変更できます。
    • 前処理コードとモデル実装を含むモデルディレクトリ

    • テンプレートは、ローカル環境と Kubeflow の DAG ランナーをコピーします。

    • 一部のテンプレートには Python ノートブックも含まれているため、Machine Learning MetaData を使用してデータとアーティファクトを探索できます。

  2. パイプライン ディレクトリで次のコマンドを実行します。

    tfx pipeline create --pipeline_path local_runner.py
    
    tfx run create --pipeline_name pipeline_name
    

    このコマンドはLocalDagRunnerを使用してパイプライン実行を作成し、次のディレクトリをパイプラインに追加します。

    • ローカルで使用される ML メタデータ ストアを含むtfx_metadataディレクトリ。
    • パイプラインのファイル出力を含むtfx_pipeline_outputディレクトリ。
  3. パイプラインのpipeline/configs.pyファイルを開いて、内容を確認します。このスクリプトは、パイプラインおよびコンポーネント関数で使用される構成オプションを定義します。ここで、データソースの場所や実行内のトレーニング ステップ数などを指定します。

  4. パイプラインのpipeline/pipeline.pyファイルを開いて、内容を確認します。このスクリプトは TFX パイプラインを作成します。最初、パイプラインにはExampleGenコンポーネントのみが含まれています。

    • pipeline.pyTODOコメントの指示に従って、パイプラインにさらにステップを追加します。
  5. local_runner.pyファイルを開いて内容を確認します。このスクリプトはパイプライン実行を作成し、 data_pathpreprocessing_fnなどの実行のパラメーターを指定します。

  6. テンプレートによって作成されたスキャフォールディングを確認し、 LocalDagRunnerを使用してパイプライン実行を作成しました。次に、要件に合わせてテンプレートをカスタマイズします。

パイプラインをカスタマイズする

このセクションでは、テンプレートのカスタマイズを開始する方法の概要を説明します。

  1. パイプラインを設計します。テンプレートが提供するスキャフォールディングは、TFX 標準コンポーネントを使用して表形式データのパイプラインを実装するのに役立ちます。既存の ML ワークフローをパイプラインに移行する場合は、 TFX 標準コンポーネントを最大限に活用するようにコードを修正する必要がある場合があります。また、ワークフローに固有の機能や、TFX 標準コンポーネントではまだサポートされていない機能を実装するカスタム コンポーネントを作成する必要がある場合もあります。

  2. パイプラインを設計したら、次のプロセスを使用してパイプラインを繰り返しカスタマイズします。パイプラインにデータを取り込むコンポーネントから開始します。これは通常、 ExampleGenコンポーネントです。

    1. ユースケースに合わせてパイプラインまたはコンポーネントをカスタマイズします。これらのカスタマイズには次のような変更が含まれる場合があります。

      • パイプラインパラメータの変更。
      • パイプラインへのコンポーネントの追加または削除。
      • データ入力ソースを置き換えます。このデータ ソースは、ファイルまたは BigQuery などのサービスへのクエリのいずれかになります。
      • パイプライン内のコンポーネントの構成を変更します。
      • コンポーネントのカスタマイズ機能を変更します。
    2. local_runner.pyスクリプトを使用してコンポーネントをローカルで実行するか、別のオーケストレーターを使用している場合は別の適切な DAG ランナーを使用します。スクリプトが失敗した場合は、失敗をデバッグし、スクリプトの実行を再試行します。

    3. このカスタマイズが機能したら、次のカスタマイズに進みます。

  3. 繰り返し作業することで、ニーズに合わせてテンプレート ワークフローの各ステップをカスタマイズできます。

カスタムパイプラインを構築する

テンプレートを使用せずにカスタム パイプラインを構築する方法について詳しくは、次の手順を使用してください。

  1. パイプラインを設計します。 TFX 標準コンポーネントは、完全な ML ワークフローの実装に役立つ実証済みの機能を提供します。既存の ML ワークフローをパイプラインに移行する場合は、TFX 標準コンポーネントを最大限に活用するためにコードを修正する必要がある場合があります。データ拡張などの機能を実装するカスタム コンポーネントの作成が必要になる場合もあります。

  2. 次の例を使用して、パイプラインを定義するスクリプト ファイルを作成します。このガイドでは、このファイルをmy_pipeline.pyと呼びます。

    import os
    from typing import Optional, Text, List
    from absl import logging
    from ml_metadata.proto import metadata_store_pb2
    import tfx.v1 as tfx
    
    PIPELINE_NAME = 'my_pipeline'
    PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output')
    METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db')
    ENABLE_CACHE = True
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
          pipeline_name=PIPELINE_NAME,
          pipeline_root=PIPELINE_ROOT,
          enable_cache=ENABLE_CACHE,
          metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
          )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    
    if __name__ == '__main__':
      logging.set_verbosity(logging.INFO)
      run_pipeline()
    

    次の手順では、 create_pipelineでパイプラインを定義し、ローカル ランナーを使用してパイプラインをローカルで実行します。

    次のプロセスを使用してパイプラインを繰り返し構築します。

    1. ユースケースに合わせてパイプラインまたはコンポーネントをカスタマイズします。これらのカスタマイズには次のような変更が含まれる場合があります。

      • パイプラインパラメータの変更。
      • パイプラインへのコンポーネントの追加または削除。
      • データ入力ファイルを置き換えます。
      • パイプライン内のコンポーネントの構成を変更します。
      • コンポーネントのカスタマイズ機能を変更します。
    2. ローカル ランナーを使用するか、スクリプトを直接実行して、コンポーネントをローカルで実行します。スクリプトが失敗した場合は、失敗をデバッグし、スクリプトの実行を再試行します。

    3. このカスタマイズが機能したら、次のカスタマイズに進みます。

    パイプラインのワークフローの最初のノードから開始します。通常、最初のノードがパイプラインにデータを取り込みます。

  3. ワークフローの最初のノードをパイプラインに追加します。この例では、パイプラインはExampleGen標準コンポーネントを使用して、 ./dataのディレクトリから CSV を読み込みます。

    from tfx.components import CsvExampleGen
    
    DATA_PATH = os.path.join('.', 'data')
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      data_path: Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      example_gen = tfx.components.CsvExampleGen(input_base=data_path)
      components.append(example_gen)
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_path=DATA_PATH,
        enable_cache=ENABLE_CACHE,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
        )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    

    CsvExampleGen指定されたデータ パスにある CSV 内のデータを使用して、シリアル化されたサンプル レコードを作成します。 CsvExampleGenコンポーネントのinput_baseパラメーターをデータ ルートで設定します。

  4. my_pipeline.pyと同じディレクトリにdataディレクトリを作成します。小さな CSV ファイルをdataディレクトリに追加します。

  5. 次のコマンドを使用して、 my_pipeline.pyスクリプトを実行します。

    python my_pipeline.py
    

    結果は次のようになります。

    INFO:absl:Component CsvExampleGen depends on [].
    INFO:absl:Component CsvExampleGen is scheduled.
    INFO:absl:Component CsvExampleGen is running.
    INFO:absl:Running driver for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Running executor for CsvExampleGen
    INFO:absl:Generating examples.
    INFO:absl:Using 1 process(es) for Local pipeline execution.
    INFO:absl:Processing input csv data ./data/* to TFExample.
    WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
    INFO:absl:Examples generated.
    INFO:absl:Running publisher for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Component CsvExampleGen is finished.
    
  6. コンポーネントをパイプラインに繰り返し追加し続けます。