Esta guía describe cómo utilizar la API TFX para crear un componente totalmente personalizado. Los componentes totalmente personalizados le permiten crear componentes definiendo la especificación del componente, el ejecutor y las clases de interfaz del componente. Este enfoque le permite reutilizar y ampliar un componente estándar para adaptarlo a sus necesidades.
Si es nuevo en las canalizaciones TFX, obtenga más información sobre los conceptos centrales de las canalizaciones TFX .
Ejecutor personalizado o componente personalizado
Si solo se necesita una lógica de procesamiento personalizada mientras las entradas, salidas y propiedades de ejecución del componente son las mismas que las de un componente existente, un ejecutor personalizado es suficiente. Se necesita un componente totalmente personalizado cuando cualquiera de las entradas, salidas o propiedades de ejecución son diferentes de cualquier componente TFX existente.
¿Cómo crear un componente personalizado?
Desarrollar un componente totalmente personalizado requiere:
- Un conjunto definido de especificaciones de artefactos de entrada y salida para el nuevo componente. Especialmente, los tipos de artefactos de entrada deben ser coherentes con los tipos de artefactos de salida de los componentes que producen los artefactos y los tipos de artefactos de salida deben ser coherentes con los tipos de artefactos de entrada de los componentes que consumen los artefactos, si los hay.
- Los parámetros de ejecución que no son artefactos necesarios para el nuevo componente.
Especificaciones del componente
La clase ComponentSpec
define el contrato del componente definiendo los artefactos de entrada y salida de un componente, así como los parámetros que se utilizan para la ejecución del componente. Tiene tres partes:
- ENTRADAS : un diccionario de parámetros escritos para los artefactos de entrada que se pasan al ejecutor del componente. Normalmente, los artefactos de entrada son las salidas de los componentes anteriores y, por lo tanto, comparten el mismo tipo.
- SALIDAS : un diccionario de parámetros escritos para los artefactos de salida que produce el componente.
- PARAMETERS : un diccionario de elementos ExecutionParameter adicionales que se pasarán al ejecutor del componente. Estos son parámetros que no son artefactos y que queremos definir de manera flexible en la canalización DSL y pasar a ejecución.
A continuación se muestra un ejemplo de ComponentSpec:
class HelloComponentSpec(types.ComponentSpec):
"""ComponentSpec for Custom TFX Hello World Component."""
PARAMETERS = {
# These are parameters that will be passed in the call to
# create an instance of this component.
'name': ExecutionParameter(type=Text),
}
INPUTS = {
# This will be a dictionary with input artifacts, including URIs
'input_data': ChannelParameter(type=standard_artifacts.Examples),
}
OUTPUTS = {
# This will be a dictionary which this component will populate
'output_data': ChannelParameter(type=standard_artifacts.Examples),
}
Ejecutor
A continuación, escriba el código ejecutor para el nuevo componente. Básicamente, se debe crear una nueva subclase de base_executor.BaseExecutor
con su función Do
anulada. En la función Do
, los argumentos input_dict
, output_dict
y exec_properties
que se pasan en se asignan a INPUTS
, OUTPUTS
y PARAMETERS
que se definen en ComponentSpec respectivamente. Para exec_properties
, el valor se puede recuperar directamente mediante una búsqueda en el diccionario. Para los artefactos en input_dict
y output_dict
, hay funciones convenientes disponibles en la clase artefacto_utils que se pueden usar para recuperar la instancia del artefacto o el uri del artefacto.
class Executor(base_executor.BaseExecutor):
"""Executor for HelloComponent."""
def Do(self, input_dict: Dict[Text, List[types.Artifact]],
output_dict: Dict[Text, List[types.Artifact]],
exec_properties: Dict[Text, Any]) -> None:
...
split_to_instance = {}
for artifact in input_dict['input_data']:
for split in json.loads(artifact.split_names):
uri = artifact_utils.get_split_uri([artifact], split)
split_to_instance[split] = uri
for split, instance in split_to_instance.items():
input_dir = instance
output_dir = artifact_utils.get_split_uri(
output_dict['output_data'], split)
for filename in tf.io.gfile.listdir(input_dir):
input_uri = os.path.join(input_dir, filename)
output_uri = os.path.join(output_dir, filename)
io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)
Unidad de prueba de un ejecutor personalizado
Se pueden crear pruebas unitarias para el ejecutor personalizado de forma similar a esta .
Interfaz de componente
Ahora que la pieza más compleja está completa, el siguiente paso es ensamblar estas piezas en una interfaz de componente, para permitir que el componente se utilice en una tubería. Hay varios pasos:
- Haga que la interfaz del componente sea una subclase de
base_component.BaseComponent
- Asigne una variable de clase
SPEC_CLASS
con la claseComponentSpec
que se definió anteriormente - Asigne una variable de clase
EXECUTOR_SPEC
con la clase Ejecutor que se definió anteriormente - Defina la función constructora
__init__()
usando los argumentos de la función para construir una instancia de la clase ComponentSpec e invocar la superfunción con ese valor, junto con un nombre opcional.
Cuando se crea una instancia del componente, se invocará la lógica de verificación de tipos en la clase base_component.BaseComponent
para garantizar que los argumentos que se pasaron sean compatibles con la información de tipo definida en la clase ComponentSpec
.
from tfx.types import standard_artifacts
from hello_component import executor
class HelloComponent(base_component.BaseComponent):
"""Custom TFX Hello World Component."""
SPEC_CLASS = HelloComponentSpec
EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)
def __init__(self,
input_data: types.Channel = None,
output_data: types.Channel = None,
name: Optional[Text] = None):
if not output_data:
examples_artifact = standard_artifacts.Examples()
examples_artifact.split_names = input_data.get()[0].split_names
output_data = channel_utils.as_channel([examples_artifact])
spec = HelloComponentSpec(input_data=input_data,
output_data=output_data, name=name)
super(HelloComponent, self).__init__(spec=spec)
Montar en una tubería TFX
El último paso es conectar el nuevo componente personalizado a una canalización TFX. Además de agregar una instancia del nuevo componente, también se necesita lo siguiente:
- Conecte correctamente los componentes ascendentes y descendentes del nuevo componente. Esto se hace haciendo referencia a las salidas del componente ascendente en el nuevo componente y haciendo referencia a las salidas del nuevo componente en los componentes descendentes.
- Agregue la nueva instancia del componente a la lista de componentes al construir la canalización.
El siguiente ejemplo destaca los cambios antes mencionados. El ejemplo completo se puede encontrar en el repositorio TFX GitHub .
def _create_pipeline():
...
example_gen = CsvExampleGen(input_base=examples)
hello = component.HelloComponent(
input_data=example_gen.outputs['examples'], name='HelloWorld')
statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
...
return pipeline.Pipeline(
...
components=[example_gen, hello, statistics_gen, ...],
...
)
Implementar un componente totalmente personalizado
Además de los cambios de código, todas las partes recién agregadas ( ComponentSpec
, Executor
, interfaz de componente) deben ser accesibles en el entorno de ejecución de la canalización para poder ejecutar la canalización correctamente.