Esta guía describe cómo utilizar la API de TFX para crear un componente totalmente personalizado. Los componentes completamente personalizados le permiten crear componentes definiendo la especificación del componente, el ejecutor y las clases de interfaz del componente. Este enfoque le permite reutilizar y ampliar un componente estándar para que se ajuste a sus necesidades.
Si es nuevo en las canalizaciones de TFX, obtenga más información sobre los conceptos básicos de las canalizaciones de TFX .
Ejecutor personalizado o componente personalizado
Si solo se necesita una lógica de procesamiento personalizada mientras que las entradas, las salidas y las propiedades de ejecución del componente son las mismas que las de un componente existente, un ejecutor personalizado es suficiente. Se necesita un componente completamente personalizado cuando cualquiera de las entradas, salidas o propiedades de ejecución es diferente de cualquier componente TFX existente.
¿Cómo crear un componente personalizado?
El desarrollo de un componente totalmente personalizado requiere:
- Un conjunto definido de especificaciones de artefactos de entrada y salida para el nuevo componente. Especialmente, los tipos de artefactos de entrada deben ser consistentes con los tipos de artefactos de salida de los componentes que producen los artefactos y los tipos de artefactos de salida deben ser consistentes con los tipos de artefactos de entrada de los componentes que consumen los artefactos, si los hay.
- Los parámetros de ejecución que no son artefactos que se necesitan para el nuevo componente.
Especificaciones del componente
La clase ComponentSpec
define el contrato del componente definiendo los artefactos de entrada y salida de un componente, así como los parámetros que se utilizan para la ejecución del componente. Tiene tres partes:
- ENTRADAS : un diccionario de parámetros escritos para los artefactos de entrada que se pasan al ejecutor del componente. Normalmente, los artefactos de entrada son las salidas de los componentes ascendentes y, por lo tanto, comparten el mismo tipo.
- SALIDAS : un diccionario de parámetros escritos para los artefactos de salida que produce el componente.
- PARÁMETROS : un diccionario de elementos ExecutionParameter adicionales que se pasarán al ejecutor del componente. Estos son parámetros que no son artefactos que queremos definir de manera flexible en el DSL de canalización y pasar a la ejecución.
Aquí hay un ejemplo de ComponentSpec:
class HelloComponentSpec(types.ComponentSpec):
"""ComponentSpec for Custom TFX Hello World Component."""
PARAMETERS = {
# These are parameters that will be passed in the call to
# create an instance of this component.
'name': ExecutionParameter(type=Text),
}
INPUTS = {
# This will be a dictionary with input artifacts, including URIs
'input_data': ChannelParameter(type=standard_artifacts.Examples),
}
OUTPUTS = {
# This will be a dictionary which this component will populate
'output_data': ChannelParameter(type=standard_artifacts.Examples),
}
Ejecutor
A continuación, escriba el código ejecutor del nuevo componente. Básicamente, se debe crear una nueva subclase de base_executor.BaseExecutor
con su función Do
anulada. En la función Do
, los argumentos input_dict
, output_dict
y exec_properties
que se pasan en mapa a INPUTS
, OUTPUTS
y PARAMETERS
que se definen en ComponentSpec respectivamente. Para exec_properties
, el valor se puede obtener directamente a través de una búsqueda en el diccionario. Para artefactos en input_dict
y output_dict
, hay funciones convenientes disponibles en la clase artefacto_utils que se pueden usar para obtener una instancia de artefacto o un uri de artefacto.
class Executor(base_executor.BaseExecutor):
"""Executor for HelloComponent."""
def Do(self, input_dict: Dict[Text, List[types.Artifact]],
output_dict: Dict[Text, List[types.Artifact]],
exec_properties: Dict[Text, Any]) -> None:
...
split_to_instance = {}
for artifact in input_dict['input_data']:
for split in json.loads(artifact.split_names):
uri = artifact_utils.get_split_uri([artifact], split)
split_to_instance[split] = uri
for split, instance in split_to_instance.items():
input_dir = instance
output_dir = artifact_utils.get_split_uri(
output_dict['output_data'], split)
for filename in tf.io.gfile.listdir(input_dir):
input_uri = os.path.join(input_dir, filename)
output_uri = os.path.join(output_dir, filename)
io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)
Unidad de prueba de un ejecutor personalizado
Se pueden crear pruebas unitarias para el ejecutor personalizado de forma similar a esta .
Interfaz de componentes
Ahora que la parte más compleja está completa, el siguiente paso es ensamblar estas piezas en una interfaz de componente, para permitir que el componente se use en una canalización. Hay varios pasos:
- Haga que la interfaz del componente sea una subclase de
base_component.BaseComponent
- Asigne una variable de clase
SPEC_CLASS
con la claseComponentSpec
que se definió anteriormente - Asigne una variable de clase
EXECUTOR_SPEC
con la clase Executor que se definió anteriormente - Defina la función constructora
__init__()
usando los argumentos de la función para construir una instancia de la clase ComponentSpec e invoque la superfunción con ese valor, junto con un nombre opcional
Cuando se crea una instancia del componente, se invocará la lógica de verificación de tipo en la clase base_component.BaseComponent
para garantizar que los argumentos que se pasaron sean compatibles con la información de tipo definida en la clase ComponentSpec
.
from tfx.types import standard_artifacts
from hello_component import executor
class HelloComponent(base_component.BaseComponent):
"""Custom TFX Hello World Component."""
SPEC_CLASS = HelloComponentSpec
EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)
def __init__(self,
input_data: types.Channel = None,
output_data: types.Channel = None,
name: Optional[Text] = None):
if not output_data:
examples_artifact = standard_artifacts.Examples()
examples_artifact.split_names = input_data.get()[0].split_names
output_data = channel_utils.as_channel([examples_artifact])
spec = HelloComponentSpec(input_data=input_data,
output_data=output_data, name=name)
super(HelloComponent, self).__init__(spec=spec)
Ensamble en una canalización TFX
El último paso es conectar el nuevo componente personalizado a una canalización TFX. Además de agregar una instancia del nuevo componente, también se necesita lo siguiente:
- Cablee correctamente los componentes aguas arriba y aguas abajo del nuevo componente. Esto se hace haciendo referencia a las salidas del componente ascendente en el nuevo componente y haciendo referencia a las salidas del nuevo componente en los componentes descendentes.
- Agregue la nueva instancia de componente a la lista de componentes al construir la canalización.
El siguiente ejemplo destaca los cambios antes mencionados. El ejemplo completo se puede encontrar en el repositorio de TFX GitHub .
def _create_pipeline():
...
example_gen = CsvExampleGen(input_base=examples)
hello = component.HelloComponent(
input_data=example_gen.outputs['examples'], name='HelloWorld')
statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
...
return pipeline.Pipeline(
...
components=[example_gen, hello, statistics_gen, ...],
...
)
Implementar un componente totalmente personalizado
Además de los cambios de código, todas las partes recién agregadas ( ComponentSpec
, Executor
, interfaz de componentes) deben ser accesibles en el entorno de ejecución de la canalización para ejecutar la canalización correctamente.