Tworzenie w pełni niestandardowych komponentów

W tym przewodniku opisano, jak używać interfejsu API TFX do budowania w pełni niestandardowego komponentu. W pełni niestandardowe komponenty umożliwiają budowanie komponentów poprzez zdefiniowanie specyfikacji komponentu, modułu wykonującego i klas interfejsu komponentu. Takie podejście pozwala ponownie wykorzystać i rozszerzyć standardowy komponent tak, aby odpowiadał Twoim potrzebom.

Jeśli nie masz doświadczenia z potokami TFX, dowiedz się więcej o podstawowych koncepcjach potoków TFX .

Niestandardowy moduł wykonawczy lub komponent niestandardowy

Jeśli potrzebna jest tylko niestandardowa logika przetwarzania, a wejścia, wyjścia i właściwości wykonawcze komponentu są takie same jak istniejącego komponentu, wystarczy niestandardowy moduł wykonujący. W pełni niestandardowy komponent jest potrzebny, gdy którekolwiek z wejść, wyników lub właściwości wykonania różnią się od jakichkolwiek istniejących komponentów TFX.

Jak utworzyć komponent niestandardowy?

Opracowanie w pełni niestandardowego komponentu wymaga:

  • Zdefiniowany zestaw specyfikacji artefaktów wejściowych i wyjściowych dla nowego komponentu. W szczególności typy artefaktów wejściowych powinny być spójne z typami artefaktów wyjściowych komponentów, które generują artefakty, a typy artefaktów wyjściowych powinny być spójne z typami artefaktów wejściowych komponentów, które zużywają artefakty, jeśli takie istnieją.
  • Parametry wykonawcze niebędące artefaktami, które są potrzebne dla nowego komponentu.

Specyfikacja komponentów

Klasa ComponentSpec definiuje kontrakt komponentu, definiując artefakty wejściowe i wyjściowe komponentu, a także parametry używane do wykonania komponentu. Ma trzy części:

  • WEJŚCIA : Słownik wpisanych parametrów dla artefaktów wejściowych, które są przekazywane do modułu wykonującego komponent. Zwykle artefakty wejściowe są wynikami z komponentów nadrzędnych, a zatem mają ten sam typ.
  • WYJŚCIA : Słownik wpisanych parametrów dla artefaktów wyjściowych generowanych przez komponent.
  • PARAMETRY : Słownik dodatkowych elementów ExecutionParameter , które zostaną przekazane do modułu wykonującego komponent. Są to parametry niebędące artefaktami, które chcemy elastycznie zdefiniować w potoku DSL i przekazać do wykonania.

Oto przykład specyfikacji komponentu:

class HelloComponentSpec(types.ComponentSpec):
  """ComponentSpec for Custom TFX Hello World Component."""

  PARAMETERS = {
      # These are parameters that will be passed in the call to
      # create an instance of this component.
      'name': ExecutionParameter(type=Text),
  }
  INPUTS = {
      # This will be a dictionary with input artifacts, including URIs
      'input_data': ChannelParameter(type=standard_artifacts.Examples),
  }
  OUTPUTS = {
      # This will be a dictionary which this component will populate
      'output_data': ChannelParameter(type=standard_artifacts.Examples),
  }

Wykonawca

Następnie napisz kod executora dla nowego komponentu. Zasadniczo należy utworzyć nową podklasę base_executor.BaseExecutor z nadpisaną funkcją Do W funkcji Do argumenty input_dict , output_dict i exec_properties przekazywane w mapowaniu odpowiednio do INPUTS , OUTPUTS i PARAMETERS zdefiniowanych w ComponentSpec. W przypadku exec_properties wartość można pobrać bezpośrednio poprzez przeszukiwanie słownika. W przypadku artefaktów w input_dict i output_dict w klasie artifact_utils dostępne są wygodne funkcje, których można użyć do pobrania instancji artefaktu lub identyfikatora uri artefaktu.

class Executor(base_executor.BaseExecutor):
  """Executor for HelloComponent."""

  def Do(self, input_dict: Dict[Text, List[types.Artifact]],
         output_dict: Dict[Text, List[types.Artifact]],
         exec_properties: Dict[Text, Any]) -> None:
    ...

    split_to_instance = {}
    for artifact in input_dict['input_data']:
      for split in json.loads(artifact.split_names):
        uri = artifact_utils.get_split_uri([artifact], split)
        split_to_instance[split] = uri

    for split, instance in split_to_instance.items():
      input_dir = instance
      output_dir = artifact_utils.get_split_uri(
          output_dict['output_data'], split)
      for filename in tf.io.gfile.listdir(input_dir):
        input_uri = os.path.join(input_dir, filename)
        output_uri = os.path.join(output_dir, filename)
        io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)

Testowanie jednostkowe niestandardowego modułu wykonującego

Testy jednostkowe dla niestandardowego modułu wykonującego można utworzyć w podobny sposób .

Interfejs komponentowy

Teraz, gdy najbardziej złożona część jest już gotowa, następnym krokiem jest złożenie tych elementów w interfejs komponentu, aby umożliwić wykorzystanie komponentu w potoku. Istnieje kilka kroków:

  • Ustaw interfejs komponentu jako podklasę base_component.BaseComponent
  • Przypisz zmienną klasy SPEC_CLASS z zdefiniowaną wcześniej klasą ComponentSpec
  • Przypisz zmienną klasy EXECUTOR_SPEC z klasą Executor, która została zdefiniowana wcześniej
  • Zdefiniuj funkcję konstruktora __init__() używając argumentów tej funkcji w celu skonstruowania instancji klasy ComponentSpec i wywołania superfunkcji z tą wartością wraz z opcjonalną nazwą

Po utworzeniu instancji komponentu zostanie wywołana logika sprawdzania typu w klasie base_component.BaseComponent , aby upewnić się, że przekazane argumenty są zgodne z informacjami o typie zdefiniowanymi w klasie ComponentSpec .

from tfx.types import standard_artifacts
from hello_component import executor

class HelloComponent(base_component.BaseComponent):
  """Custom TFX Hello World Component."""

  SPEC_CLASS = HelloComponentSpec
  EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)

  def __init__(self,
               input_data: types.Channel = None,
               output_data: types.Channel = None,
               name: Optional[Text] = None):
    if not output_data:
      examples_artifact = standard_artifacts.Examples()
      examples_artifact.split_names = input_data.get()[0].split_names
      output_data = channel_utils.as_channel([examples_artifact])

    spec = HelloComponentSpec(input_data=input_data,
                              output_data=output_data, name=name)
    super(HelloComponent, self).__init__(spec=spec)

Zmontuj w potok TFX

Ostatnim krokiem jest podłączenie nowego niestandardowego komponentu do potoku TFX. Oprócz dodania instancji nowego komponentu potrzebne są również:

  • Prawidłowo podłącz do niego górne i dolne komponenty nowego komponentu. Odbywa się to poprzez odniesienie do wyników komponentu wyższego szczebla w nowym komponencie i odniesienie do wyników nowego komponentu w komponentach dalszych
  • Dodaj nową instancję komponentu do listy komponentów podczas konstruowania potoku.

Poniższy przykład ilustruje powyższe zmiany. Pełny przykład można znaleźć w repozytorium TFX GitHub .

def _create_pipeline():
  ...
  example_gen = CsvExampleGen(input_base=examples)
  hello = component.HelloComponent(
      input_data=example_gen.outputs['examples'], name='HelloWorld')
  statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
  ...
  return pipeline.Pipeline(
      ...
      components=[example_gen, hello, statistics_gen, ...],
      ...
  )

Wdróż w pełni niestandardowy komponent

Oprócz zmian w kodzie, wszystkie nowo dodane części ( ComponentSpec , Executor , interfejs komponentu) muszą być dostępne w środowisku działającym potoku, aby potok działał prawidłowo.