La définition de composants basée sur les fonctions Python facilite la création de composants personnalisés TFX, en vous évitant d'avoir à définir une classe de spécification de composant, une classe d'exécuteur et une classe d'interface de composant. Dans ce style de définition de composant, vous écrivez une fonction annotée avec des indications de type. Les indications de type décrivent les artefacts d'entrée, les artefacts de sortie et les paramètres de votre composant.
Écrire votre composant personnalisé dans ce style est très simple, comme dans l'exemple suivant.
class MyOutput(TypedDict):
accuracy: float
@component
def MyValidationComponent(
model: InputArtifact[Model],
blessing: OutputArtifact[Model],
accuracy_threshold: Parameter[int] = 10,
) -> MyOutput:
'''My simple custom model validation component.'''
accuracy = evaluate_model(model)
if accuracy >= accuracy_threshold:
write_output_blessing(blessing)
return {
'accuracy': accuracy
}
Sous le capot, cela définit un composant personnalisé qui est une sous-classe de BaseComponent
et de ses classes Spec et Executor.
Si vous souhaitez définir une sous-classe de BaseBeamComponent
de telle sorte que vous puissiez utiliser un pipeline de faisceau avec une configuration partagée par pipeline TFX, c'est-à-dire beam_pipeline_args
lors de la compilation du pipeline ( exemple de pipeline de taxi de Chicago ), vous pouvez définir use_beam=True
dans le décorateur et ajouter un autre BeamComponentParameter
avec la valeur par défaut None
dans votre fonction comme exemple suivant :
@component(use_beam=True)
def MyDataProcessor(
examples: InputArtifact[Example],
processed_examples: OutputArtifact[Example],
beam_pipeline: BeamComponentParameter[beam.Pipeline] = None,
) -> None:
'''My simple custom model validation component.'''
with beam_pipeline as p:
# data pipeline definition with beam_pipeline begins
...
# data pipeline definition with beam_pipeline ends
Si vous débutez avec les pipelines TFX, découvrez les concepts fondamentaux des pipelines TFX .
Entrées, sorties et paramètres
Dans TFX, les entrées et les sorties sont suivies en tant qu'objets Artefact qui décrivent l'emplacement et les propriétés des métadonnées associées aux données sous-jacentes ; ces informations sont stockées dans les métadonnées ML. Les artefacts peuvent décrire des types de données complexes ou des types de données simples, tels que : int, float, bytes ou chaînes Unicode.
Un paramètre est un argument (int, float, octets ou chaîne Unicode) d'un composant connu au moment de la construction du pipeline. Les paramètres sont utiles pour spécifier des arguments et des hyperparamètres tels que le nombre d'itérations d'entraînement, le taux d'abandon et d'autres configurations pour votre composant. Les paramètres sont stockés en tant que propriétés des exécutions de composants lorsqu'ils sont suivis dans les métadonnées ML.
Définition
Pour créer un composant personnalisé, écrivez une fonction qui implémente votre logique personnalisée et décorez-la avec le décorateur @component
du module tfx.dsl.component.experimental.decorators
. Pour définir le schéma d'entrée et de sortie de votre composant, annotez les arguments de votre fonction et renvoyez la valeur à l'aide des annotations du module tfx.dsl.component.experimental.annotations
:
Pour chaque entrée d'artefact , appliquez l'annotation d'indication de type
InputArtifact[ArtifactType]
. RemplacezArtifactType
par le type de l'artefact, qui est une sous-classe detfx.types.Artifact
. Ces entrées peuvent être des arguments facultatifs.Pour chaque artefact de sortie , appliquez l'annotation d'indication de type
OutputArtifact[ArtifactType]
. RemplacezArtifactType
par le type de l'artefact, qui est une sous-classe detfx.types.Artifact
. Les artefacts de sortie du composant doivent être transmis en tant qu'arguments d'entrée de la fonction, afin que votre composant puisse écrire les sorties dans un emplacement géré par le système et définir les propriétés de métadonnées d'artefact appropriées. Cet argument peut être facultatif ou cet argument peut être défini avec une valeur par défaut.Pour chaque paramètre , utilisez l'annotation d'indice de type
Parameter[T]
. RemplacezT
par le type du paramètre. Nous ne prenons actuellement en charge que les types Python primitifs :bool
,int
,float
,str
oubytes
.Pour le pipeline de faisceaux , utilisez l'annotation d'indice de type
BeamComponentParameter[beam.Pipeline]
. Définissez la valeur par défaut surNone
. La valeurNone
sera remplacée par un pipeline de faisceau instancié créé par_make_beam_pipeline()
deBaseBeamExecutor
Pour chaque entrée de type de données simple (
int
,float
,str
oubytes
) inconnue au moment de la construction du pipeline, utilisez l'indice de typeT
. Notez que dans la version TFX 0.22, les valeurs concrètes ne peuvent pas être transmises au moment de la construction du pipeline pour ce type d'entrée (utilisez plutôt l'annotationParameter
, comme décrit dans la section précédente). Cet argument peut être facultatif ou cet argument peut être défini avec une valeur par défaut. Si votre composant a des sorties de type de données simples (int
,float
,str
oubytes
), vous pouvez renvoyer ces sorties en utilisant unTypedDict
comme annotation de type de retour et en renvoyant un objet dict approprié.
Dans le corps de votre fonction, les artefacts d'entrée et de sortie sont transmis en tant qu'objets tfx.types.Artifact
; vous pouvez inspecter son .uri
pour obtenir son emplacement géré par le système et lire/définir toutes les propriétés. Les paramètres d'entrée et les entrées de type de données simples sont transmis en tant qu'objets du type spécifié. Les sorties de type de données simples doivent être renvoyées sous forme de dictionnaire, où les clés sont les noms de sortie appropriés et les valeurs sont les valeurs de retour souhaitées.
Le composant fonctionnel terminé peut ressembler à ceci :
from typing import TypedDict
import tfx.v1 as tfx
from tfx.dsl.component.experimental.decorators import component
class MyOutput(TypedDict):
loss: float
accuracy: float
@component
def MyTrainerComponent(
training_data: tfx.dsl.components.InputArtifact[tfx.types.standard_artifacts.Examples],
model: tfx.dsl.components.OutputArtifact[tfx.types.standard_artifacts.Model],
dropout_hyperparameter: float,
num_iterations: tfx.dsl.components.Parameter[int] = 10
) -> MyOutput:
'''My simple trainer component.'''
records = read_examples(training_data.uri)
model_obj = train_model(records, num_iterations, dropout_hyperparameter)
model_obj.write_to(model.uri)
return {
'loss': model_obj.loss,
'accuracy': model_obj.accuracy
}
# Example usage in a pipeline graph definition:
# ...
trainer = MyTrainerComponent(
examples=example_gen.outputs['examples'],
dropout_hyperparameter=other_component.outputs['dropout'],
num_iterations=1000)
pusher = Pusher(model=trainer.outputs['model'])
# ...
L'exemple précédent définit MyTrainerComponent
en tant que composant personnalisé basé sur une fonction Python. Ce composant consomme un artefact examples
en entrée et produit un artefact model
en sortie. Le composant utilise l' artifact_instance.uri
pour lire ou écrire l'artefact à son emplacement géré par le système. Le composant prend un paramètre d'entrée num_iterations
et une valeur de type de données simple dropout_hyperparameter
, et le composant génère des métriques loss
et accuracy
sous forme de valeurs de sortie de type de données simples. L'artefact model
de sortie est ensuite utilisé par le composant Pusher
.