TFX ułatwia orkiestrację przepływu pracy w uczeniu maszynowym (ML) jako potoku, aby:
- Zautomatyzuj proces uczenia maszynowego, który umożliwia regularne ponowne szkolenie, ocenę i wdrażanie modelu.
- Twórz potoki uczenia maszynowego, które obejmują głęboką analizę wydajności modelu i weryfikację nowo wyszkolonych modeli, aby zapewnić wydajność i niezawodność.
- Monitoruj dane szkoleniowe pod kątem anomalii i eliminuj odchylenia w obsłudze szkoleń
- Zwiększ prędkość eksperymentowania, uruchamiając potok z różnymi zestawami hiperparametrów.
Typowy proces opracowywania potoku rozpoczyna się na komputerze lokalnym, od analizy danych i konfiguracji komponentów, przed wdrożeniem do produkcji. W tym przewodniku opisano dwa sposoby lokalnego budowania potoku.
- Dostosuj szablon potoku TFX, aby dopasować go do potrzeb przepływu pracy ML. Szablony potoków TFX to wstępnie zbudowane przepływy pracy, które demonstrują najlepsze praktyki przy użyciu standardowych komponentów TFX.
- Zbuduj potok za pomocą TFX. W tym przypadku definiujesz potok bez rozpoczynania od szablonu.
Podczas opracowywania potoku możesz go uruchomić za pomocą LocalDagRunner
. Następnie, gdy komponenty potoku zostaną dobrze zdefiniowane i przetestowane, można użyć koordynatora klasy produkcyjnej, takiego jak Kubeflow lub Airflow.
Zanim zaczniesz
TFX to pakiet Pythona, więc będziesz musiał skonfigurować środowisko programistyczne Pythona, takie jak środowisko wirtualne lub kontener Docker. Następnie:
pip install tfx
Jeśli nie masz doświadczenia z potokami TFX, zanim przejdziesz dalej , dowiedz się więcej o podstawowych koncepcjach potoków TFX .
Utwórz potok przy użyciu szablonu
Szablony potoków TFX ułatwiają rozpoczęcie opracowywania potoków, udostępniając wstępnie utworzony zestaw definicji potoków, które można dostosować do swojego przypadku użycia.
W poniższych sekcjach opisano, jak utworzyć kopię szablonu i dostosować ją do własnych potrzeb.
Utwórz kopię szablonu potoku
Zobacz listę dostępnych szablonów potoków TFX:
tfx template list
Wybierz szablon z listy
tfx template copy --model=template --pipeline_name=pipeline-name \ --destination_path=destination-path
Zastąp następujące elementy:
- template : nazwa szablonu, który chcesz skopiować.
- pipeline-name : Nazwa potoku, który ma zostać utworzony.
- destination-path : Ścieżka, do której ma zostać skopiowany szablon.
Dowiedz się więcej o poleceniu
tfx template copy
.Kopia szablonu potoku została utworzona w określonej ścieżce.
Zapoznaj się z szablonem potoku
Ta sekcja zawiera przegląd rusztowania utworzonego za pomocą szablonu.
Przeglądaj katalogi i pliki, które zostały skopiowane do katalogu głównego potoku
- Katalog potoków z
-
pipeline.py
- definiuje potok i wyświetla listę używanych komponentów -
configs.py
— przechowuj szczegóły konfiguracji, takie jak źródło pochodzenia danych lub używany program Orchestrator
-
- Katalog danych
- Zwykle zawiera plik
data.csv
, który jest domyślnym źródłem dlaExampleGen
. Możesz zmienić źródło danych wconfigs.py
.
- Zwykle zawiera plik
Katalog modeli zawierający kod przetwarzania wstępnego i implementacje modeli
Szablon kopiuje moduły DAG dla środowiska lokalnego i Kubeflow.
Niektóre szablony zawierają także notesy języka Python, dzięki którym można eksplorować dane i artefakty za pomocą metadanych uczenia maszynowego.
- Katalog potoków z
Uruchom następujące polecenia w katalogu potoku:
tfx pipeline create --pipeline_path local_runner.py
tfx run create --pipeline_name pipeline_name
Polecenie tworzy uruchomienie potoku przy użyciu
LocalDagRunner
, który dodaje do potoku następujące katalogi:- Katalog tfx_metadata zawierający magazyn metadanych ML używany lokalnie.
- Katalog tfx_pipeline_output , który zawiera pliki wyjściowe potoku.
Otwórz plik potoku
pipeline/configs.py
i przejrzyj zawartość. Ten skrypt definiuje opcje konfiguracyjne używane przez potok i funkcje komponentu. W tym miejscu można określić takie elementy, jak lokalizacja źródła danych lub liczba kroków szkoleniowych w przebiegu.Otwórz plik potoku
pipeline/pipeline.py
i przejrzyj jego zawartość. Ten skrypt tworzy potok TFX. Początkowo potok zawiera tylko komponentExampleGen
.- Postępuj zgodnie z instrukcjami zawartymi w komentarzach TODO w
pipeline.py
, aby dodać więcej kroków do potoku.
- Postępuj zgodnie z instrukcjami zawartymi w komentarzach TODO w
Otwórz plik
local_runner.py
i przejrzyj zawartość. Ten skrypt tworzy uruchomienie potoku i określa parametry przebiegu, takie jakdata_path
ipreprocessing_fn
.Przejrzałeś rusztowanie utworzone przez szablon i utworzyłeś uruchomienie potoku przy użyciu
LocalDagRunner
. Następnie dostosuj szablon do swoich wymagań.
Dostosuj swój rurociąg
W tej sekcji omówiono, jak rozpocząć dostosowywanie szablonu.
Zaprojektuj swój rurociąg. Rusztowanie udostępniane przez szablon pomaga zaimplementować potok dla danych tabelarycznych przy użyciu standardowych komponentów TFX. Jeśli przenosisz istniejący przepływ pracy ML do potoku, może być konieczna zmiana kodu, aby w pełni wykorzystać standardowe komponenty TFX . Może być również konieczne utworzenie niestandardowych komponentów , które implementują funkcje, które są unikalne dla Twojego przepływu pracy lub które nie są jeszcze obsługiwane przez standardowe komponenty TFX.
Po zaprojektowaniu potoku iteracyjnie dostosuj potok, korzystając z poniższego procesu. Zacznij od komponentu, który pozyskuje dane do potoku, którym jest zazwyczaj komponent
ExampleGen
.Dostosuj potok lub komponent do swojego przypadku użycia. Te dostosowania mogą obejmować zmiany takie jak:
- Zmiana parametrów rurociągu.
- Dodawanie komponentów do rurociągu lub ich usuwanie.
- Wymiana źródła wprowadzania danych. Tym źródłem danych może być plik lub zapytania do usług takich jak BigQuery.
- Zmiana konfiguracji komponentu w potoku.
- Zmiana funkcji dostosowywania komponentu.
Uruchom komponent lokalnie, używając skryptu
local_runner.py
lub innego odpowiedniego modułu uruchamiającego DAG, jeśli używasz innego koordynatora. Jeśli skrypt nie powiedzie się, usuń błąd i spróbuj ponownie uruchomić skrypt.Gdy to dostosowanie zacznie działać, przejdź do następnego dostosowania.
Pracując iteracyjnie, możesz dostosować każdy krok przepływu pracy szablonu do swoich potrzeb.
Utwórz niestandardowy potok
Skorzystaj z poniższych instrukcji, aby dowiedzieć się więcej na temat tworzenia niestandardowego potoku bez użycia szablonu.
Zaprojektuj swój rurociąg. Standardowe komponenty TFX zapewniają sprawdzoną funkcjonalność, która pomaga wdrożyć kompletny przepływ pracy ML. Jeśli przenosisz istniejący przepływ pracy ML do potoku, może być konieczna zmiana kodu, aby w pełni wykorzystać standardowe komponenty TFX. Może być również konieczne utworzenie niestandardowych komponentów , które implementują takie funkcje, jak powiększanie danych.
- Dowiedz się więcej o standardowych komponentach TFX .
- Dowiedz się więcej o komponentach niestandardowych .
Utwórz plik skryptu, aby zdefiniować potok, korzystając z poniższego przykładu. W tym przewodniku ten plik jest określany jako
my_pipeline.py
.import os from typing import Optional, Text, List from absl import logging from ml_metadata.proto import metadata_store_pb2 import tfx.v1 as tfx PIPELINE_NAME = 'my_pipeline' PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output') METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db') ENABLE_CACHE = True def create_pipeline( pipeline_name: Text, pipeline_root:Text, enable_cache: bool, metadata_connection_config: Optional[ metadata_store_pb2.ConnectionConfig] = None, beam_pipeline_args: Optional[List[Text]] = None ): components = [] return tfx.dsl.Pipeline( pipeline_name=pipeline_name, pipeline_root=pipeline_root, components=components, enable_cache=enable_cache, metadata_connection_config=metadata_connection_config, beam_pipeline_args=beam_pipeline_args, ) def run_pipeline(): my_pipeline = create_pipeline( pipeline_name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT, enable_cache=ENABLE_CACHE, metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH) ) tfx.orchestration.LocalDagRunner().run(my_pipeline) if __name__ == '__main__': logging.set_verbosity(logging.INFO) run_pipeline()
W kolejnych krokach zdefiniujesz potok w
create_pipeline
i uruchomisz go lokalnie za pomocą lokalnego modułu uruchamiającego.Iteracyjnie utwórz potok, korzystając z następującego procesu.
Dostosuj potok lub komponent do swojego przypadku użycia. Te dostosowania mogą obejmować zmiany takie jak:
- Zmiana parametrów rurociągu.
- Dodawanie komponentów do rurociągu lub ich usuwanie.
- Zastępowanie pliku wejściowego danych.
- Zmiana konfiguracji komponentu w potoku.
- Zmiana funkcji dostosowywania komponentu.
Uruchom komponent lokalnie, korzystając z lokalnego modułu uruchamiającego lub bezpośrednio uruchamiając skrypt. Jeśli skrypt nie powiedzie się, usuń błąd i spróbuj ponownie uruchomić skrypt.
Gdy to dostosowanie zacznie działać, przejdź do następnego dostosowania.
Zacznij od pierwszego węzła w przepływie pracy potoku. Zazwyczaj pierwszy węzeł pozyskuje dane do potoku.
Dodaj pierwszy węzeł przepływu pracy do potoku. W tym przykładzie potok używa standardowego komponentu
ExampleGen
do ładowania pliku CSV z katalogu./data
.from tfx.components import CsvExampleGen DATA_PATH = os.path.join('.', 'data') def create_pipeline( pipeline_name: Text, pipeline_root:Text, data_path: Text, enable_cache: bool, metadata_connection_config: Optional[ metadata_store_pb2.ConnectionConfig] = None, beam_pipeline_args: Optional[List[Text]] = None ): components = [] example_gen = tfx.components.CsvExampleGen(input_base=data_path) components.append(example_gen) return tfx.dsl.Pipeline( pipeline_name=pipeline_name, pipeline_root=pipeline_root, components=components, enable_cache=enable_cache, metadata_connection_config=metadata_connection_config, beam_pipeline_args=beam_pipeline_args, ) def run_pipeline(): my_pipeline = create_pipeline( pipeline_name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT, data_path=DATA_PATH, enable_cache=ENABLE_CACHE, metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH) ) tfx.orchestration.LocalDagRunner().run(my_pipeline)
CsvExampleGen
tworzy serializowane rekordy przykładowe przy użyciu danych w pliku CSV w określonej ścieżce danych. Ustawiając parametrinput_base
komponentuCsvExampleGen
w katalogu głównym danych.Utwórz katalog
data
w tym samym katalogu comy_pipeline.py
. Dodaj mały plik CSV do katalogudata
.Użyj następującego polecenia, aby uruchomić skrypt
my_pipeline.py
.python my_pipeline.py
Wynik powinien wyglądać mniej więcej tak:
INFO:absl:Component CsvExampleGen depends on []. INFO:absl:Component CsvExampleGen is scheduled. INFO:absl:Component CsvExampleGen is running. INFO:absl:Running driver for CsvExampleGen INFO:absl:MetadataStore with DB connection initialized INFO:absl:Running executor for CsvExampleGen INFO:absl:Generating examples. INFO:absl:Using 1 process(es) for Local pipeline execution. INFO:absl:Processing input csv data ./data/* to TFExample. WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Running publisher for CsvExampleGen INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished.
Kontynuuj iteracyjne dodawanie komponentów do potoku.