W tym przewodniku opisano, jak używać interfejsu API TFX do budowania w pełni niestandardowego komponentu. W pełni niestandardowe komponenty umożliwiają budowanie komponentów poprzez zdefiniowanie specyfikacji komponentu, modułu wykonującego i klas interfejsu komponentu. Takie podejście pozwala ponownie wykorzystać i rozszerzyć standardowy komponent tak, aby odpowiadał Twoim potrzebom.
Jeśli nie masz doświadczenia z potokami TFX, dowiedz się więcej o podstawowych koncepcjach potoków TFX .
Niestandardowy moduł wykonawczy lub komponent niestandardowy
Jeśli potrzebna jest tylko niestandardowa logika przetwarzania, a wejścia, wyjścia i właściwości wykonawcze komponentu są takie same jak istniejącego komponentu, wystarczy niestandardowy moduł wykonujący. W pełni niestandardowy komponent jest potrzebny, gdy którekolwiek z wejść, wyników lub właściwości wykonania różnią się od jakichkolwiek istniejących komponentów TFX.
Jak utworzyć komponent niestandardowy?
Opracowanie w pełni niestandardowego komponentu wymaga:
- Zdefiniowany zestaw specyfikacji artefaktów wejściowych i wyjściowych dla nowego komponentu. W szczególności typy artefaktów wejściowych powinny być spójne z typami artefaktów wyjściowych komponentów, które generują artefakty, a typy artefaktów wyjściowych powinny być spójne z typami artefaktów wejściowych komponentów, które zużywają artefakty, jeśli takie istnieją.
- Parametry wykonawcze niebędące artefaktami, które są potrzebne dla nowego komponentu.
Specyfikacja komponentów
Klasa ComponentSpec
definiuje kontrakt komponentu, definiując artefakty wejściowe i wyjściowe komponentu, a także parametry używane do wykonania komponentu. Ma trzy części:
- WEJŚCIA : Słownik wpisanych parametrów dla artefaktów wejściowych, które są przekazywane do modułu wykonującego komponent. Zwykle artefakty wejściowe są wynikami z komponentów nadrzędnych, a zatem mają ten sam typ.
- WYJŚCIA : Słownik wpisanych parametrów dla artefaktów wyjściowych generowanych przez komponent.
- PARAMETRY : Słownik dodatkowych elementów ExecutionParameter , które zostaną przekazane do modułu wykonującego komponent. Są to parametry niebędące artefaktami, które chcemy elastycznie zdefiniować w potoku DSL i przekazać do wykonania.
Oto przykład specyfikacji komponentu:
class HelloComponentSpec(types.ComponentSpec):
"""ComponentSpec for Custom TFX Hello World Component."""
PARAMETERS = {
# These are parameters that will be passed in the call to
# create an instance of this component.
'name': ExecutionParameter(type=Text),
}
INPUTS = {
# This will be a dictionary with input artifacts, including URIs
'input_data': ChannelParameter(type=standard_artifacts.Examples),
}
OUTPUTS = {
# This will be a dictionary which this component will populate
'output_data': ChannelParameter(type=standard_artifacts.Examples),
}
Wykonawca
Następnie napisz kod executora dla nowego komponentu. Zasadniczo należy utworzyć nową podklasę base_executor.BaseExecutor
z nadpisaną funkcją Do
W funkcji Do
argumenty input_dict
, output_dict
i exec_properties
przekazywane w mapowaniu odpowiednio do INPUTS
, OUTPUTS
i PARAMETERS
zdefiniowanych w ComponentSpec. W przypadku exec_properties
wartość można pobrać bezpośrednio poprzez przeszukiwanie słownika. W przypadku artefaktów w input_dict
i output_dict
w klasie artifact_utils dostępne są wygodne funkcje, których można użyć do pobrania instancji artefaktu lub identyfikatora uri artefaktu.
class Executor(base_executor.BaseExecutor):
"""Executor for HelloComponent."""
def Do(self, input_dict: Dict[Text, List[types.Artifact]],
output_dict: Dict[Text, List[types.Artifact]],
exec_properties: Dict[Text, Any]) -> None:
...
split_to_instance = {}
for artifact in input_dict['input_data']:
for split in json.loads(artifact.split_names):
uri = artifact_utils.get_split_uri([artifact], split)
split_to_instance[split] = uri
for split, instance in split_to_instance.items():
input_dir = instance
output_dir = artifact_utils.get_split_uri(
output_dict['output_data'], split)
for filename in tf.io.gfile.listdir(input_dir):
input_uri = os.path.join(input_dir, filename)
output_uri = os.path.join(output_dir, filename)
io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)
Testowanie jednostkowe niestandardowego modułu wykonującego
Testy jednostkowe dla niestandardowego modułu wykonującego można utworzyć w podobny sposób .
Interfejs komponentowy
Teraz, gdy najbardziej złożona część jest już gotowa, następnym krokiem jest złożenie tych elementów w interfejs komponentu, aby umożliwić wykorzystanie komponentu w potoku. Istnieje kilka kroków:
- Ustaw interfejs komponentu jako podklasę
base_component.BaseComponent
- Przypisz zmienną klasy
SPEC_CLASS
z zdefiniowaną wcześniej klasąComponentSpec
- Przypisz zmienną klasy
EXECUTOR_SPEC
z klasą Executor, która została zdefiniowana wcześniej - Zdefiniuj funkcję konstruktora
__init__()
używając argumentów tej funkcji w celu skonstruowania instancji klasy ComponentSpec i wywołania superfunkcji z tą wartością wraz z opcjonalną nazwą
Po utworzeniu instancji komponentu zostanie wywołana logika sprawdzania typu w klasie base_component.BaseComponent
, aby upewnić się, że przekazane argumenty są zgodne z informacjami o typie zdefiniowanymi w klasie ComponentSpec
.
from tfx.types import standard_artifacts
from hello_component import executor
class HelloComponent(base_component.BaseComponent):
"""Custom TFX Hello World Component."""
SPEC_CLASS = HelloComponentSpec
EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)
def __init__(self,
input_data: types.Channel = None,
output_data: types.Channel = None,
name: Optional[Text] = None):
if not output_data:
examples_artifact = standard_artifacts.Examples()
examples_artifact.split_names = input_data.get()[0].split_names
output_data = channel_utils.as_channel([examples_artifact])
spec = HelloComponentSpec(input_data=input_data,
output_data=output_data, name=name)
super(HelloComponent, self).__init__(spec=spec)
Zmontuj w potok TFX
Ostatnim krokiem jest podłączenie nowego niestandardowego komponentu do potoku TFX. Oprócz dodania instancji nowego komponentu potrzebne są również:
- Prawidłowo podłącz do niego górne i dolne komponenty nowego komponentu. Odbywa się to poprzez odniesienie do wyników wyższego komponentu w nowym komponencie i odniesienie do wyników nowego komponentu w dalszych komponentach
- Dodaj nową instancję komponentu do listy komponentów podczas konstruowania potoku.
Poniższy przykład ilustruje powyższe zmiany. Pełny przykład można znaleźć w repozytorium TFX GitHub .
def _create_pipeline():
...
example_gen = CsvExampleGen(input_base=examples)
hello = component.HelloComponent(
input_data=example_gen.outputs['examples'], name='HelloWorld')
statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
...
return pipeline.Pipeline(
...
components=[example_gen, hello, statistics_gen, ...],
...
)
Wdróż w pełni niestandardowy komponent
Oprócz zmian w kodzie, wszystkie nowo dodane części ( ComponentSpec
, Executor
, interfejs komponentu) muszą być dostępne w środowisku działającym potoku, aby potok działał prawidłowo.