A tutorial to train a TensorFlow Recommenders ranking model as a TFX pipeline.
View on TensorFlow.org | Run in Google Colab | View source on GitHub | Download notebook |
In this notebook-based tutorial, we will create and run a TFX pipeline to train a ranking model to predict movie ratings using TensorFlow Recommenders (TFRS). The pipeline will consist of three essential TFX components: ExampleGen, Trainer and Pusher. The pipeline includes the most minimal ML workflow like importing data, training a model and exporting the trained TFRS ranking model.
Set Up
We first need to install the TFX Python package and download the dataset which we will use for our model.
Upgrade Pip
To avoid upgrading Pip in a system when running locally, check to make sure that we are running in Colab. Local systems can of course be upgraded separately.
import sys
if 'google.colab' in sys.modules:
!pip install --upgrade pip
Install TFX
pip install -U tfx
pip install -U tensorflow-recommenders
Did you restart the runtime?
If you are using Google Colab, the first time that you run the cell above, you must restart the runtime by clicking above "RESTART RUNTIME" button or using "Runtime > Restart runtime ..." menu. This is because of the way that Colab loads packages.
Before we define the pipeline, we need to write the model code for the Trainer component and save it in a file.
Check the TensorFlow and TFX versions.
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
2022-12-14 13:03:42.352068: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered 2022-12-14 13:03:43.224089: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory 2022-12-14 13:03:43.224183: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory 2022-12-14 13:03:43.224193: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly. TensorFlow version: 2.10.1 TFX version: 1.11.0
Set up variables
There are some variables used to define a pipeline. You can customize these variables as you want. By default all output from the pipeline will be generated under the current directory. Instead of using the SchemaGen component to generate a schema, for this tutorial we will create a hardcoded schema.
import os
PIPELINE_NAME = 'TFRS-ranking'
# Directory where MovieLens 100K rating data lives
DATA_ROOT = os.path.join('data', PIPELINE_NAME)
# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)
from absl import logging
logging.set_verbosity(logging.INFO) # Set default logging level.
Prepare example data
Since TFX does not currently support TensorFlow Datasets API, we will download the MovieLens 100K dataset manually for use in our TFX pipeline. The dataset we are using is MovieLens 100K Dataset.
There are four numeric features in this dataset:
- userId
- movieId
- rating
- timestamp
We will build a ranking model which predicts the rating
of the movies. We will not use the timestamp
feature.
Because TFX ExampleGen reads inputs from a directory, we need to create a directory and copy dataset to it.
wget https://files.grouplens.org/datasets/movielens/ml-100k.zip
mkdir -p {DATA_ROOT}
unzip ml-100k.zip
echo 'userId,movieId,rating,timestamp' > {DATA_ROOT}/ratings.csv
sed 's/\t/,/g' ml-100k/u.data >> {DATA_ROOT}/ratings.csv
--2022-12-14 13:03:46-- https://files.grouplens.org/datasets/movielens/ml-100k.zip Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152 Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 4924029 (4.7M) [application/zip] Saving to: ‘ml-100k.zip’ ml-100k.zip 100%[===================>] 4.70M 31.0MB/s in 0.2s 2022-12-14 13:03:47 (31.0 MB/s) - ‘ml-100k.zip’ saved [4924029/4924029] Archive: ml-100k.zip creating: ml-100k/ inflating: ml-100k/allbut.pl inflating: ml-100k/mku.sh inflating: ml-100k/README inflating: ml-100k/u.data inflating: ml-100k/u.genre inflating: ml-100k/u.info inflating: ml-100k/u.item inflating: ml-100k/u.occupation inflating: ml-100k/u.user inflating: ml-100k/u1.base inflating: ml-100k/u1.test inflating: ml-100k/u2.base inflating: ml-100k/u2.test inflating: ml-100k/u3.base inflating: ml-100k/u3.test inflating: ml-100k/u4.base inflating: ml-100k/u4.test inflating: ml-100k/u5.base inflating: ml-100k/u5.test inflating: ml-100k/ua.base inflating: ml-100k/ua.test inflating: ml-100k/ub.base inflating: ml-100k/ub.test
Take a quick look at the CSV file.
head {DATA_ROOT}/ratings.csv
userId,movieId,rating,timestamp 196,242,3,881250949 186,302,3,891717742 22,377,1,878887116 244,51,2,880606923 166,346,1,886397596 298,474,4,884182806 115,265,2,881171488 253,465,5,891628467 305,451,3,886324817
You should be able to see four values. For example, the first example means user '196' gives a rating of 3 to movie '242'.
Create a pipeline
TFX pipelines are defined using Python APIs. We will define a pipeline which consists of following three components.
- CsvExampleGen: Reads in data files and convert them to TFX internal format for further processing. There are multiple ExampleGens for various formats. In this tutorial, we will use CsvExampleGen which takes CSV file input.
- Trainer: Trains an ML model. Trainer component requires a model definition code from users. You can use TensorFlow APIs to specify how to train a model and save it in a _savedmodel format.
- Pusher: Copies the trained model outside of the TFX pipeline. Pusher component can be thought of an deployment process of the trained ML model.
Before actually define the pipeline, we need to write a model code for the Trainer component first.
Write model training code
We will build a simple ranking model to predict movie ratings. This model training code will be saved to a separate file.
In this tutorial we will use
Generic Trainer
of TFX which support Keras-based models. You need to write a Python file
containing run_fn
function, which is the entrypoint for the Trainer
component.
_trainer_module_file = 'tfrs_ranking_trainer.py'
The ranking model we use is almost exactly the same as in the Basic Ranking tutorial. The only difference is that we use movie IDs instead of movie titles in the candidate tower.
%%writefile {_trainer_module_file}
from typing import Dict, Text
from typing import List
import numpy as np
import tensorflow as tf
from tensorflow_metadata.proto.v0 import schema_pb2
import tensorflow_recommenders as tfrs
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
_FEATURE_KEYS = ['userId', 'movieId']
_LABEL_KEY = 'rating'
_FEATURE_SPEC = {
**{
feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
for feature in _FEATURE_KEYS
}, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}
class RankingModel(tf.keras.Model):
def __init__(self):
super().__init__()
embedding_dimension = 32
unique_user_ids = np.array(range(943)).astype(str)
unique_movie_ids = np.array(range(1682)).astype(str)
# Compute embeddings for users.
self.user_embeddings = tf.keras.Sequential([
tf.keras.layers.Input(shape=(1,), name='userId', dtype=tf.int64),
tf.keras.layers.Lambda(lambda x: tf.as_string(x)),
tf.keras.layers.StringLookup(
vocabulary=unique_user_ids, mask_token=None),
tf.keras.layers.Embedding(
len(unique_user_ids) + 1, embedding_dimension)
])
# Compute embeddings for movies.
self.movie_embeddings = tf.keras.Sequential([
tf.keras.layers.Input(shape=(1,), name='movieId', dtype=tf.int64),
tf.keras.layers.Lambda(lambda x: tf.as_string(x)),
tf.keras.layers.StringLookup(
vocabulary=unique_movie_ids, mask_token=None),
tf.keras.layers.Embedding(
len(unique_movie_ids) + 1, embedding_dimension)
])
# Compute predictions.
self.ratings = tf.keras.Sequential([
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(1)
])
def call(self, inputs):
user_id, movie_id = inputs
user_embedding = self.user_embeddings(user_id)
movie_embedding = self.movie_embeddings(movie_id)
return self.ratings(tf.concat([user_embedding, movie_embedding], axis=2))
class MovielensModel(tfrs.models.Model):
def __init__(self):
super().__init__()
self.ranking_model: tf.keras.Model = RankingModel()
self.task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
loss=tf.keras.losses.MeanSquaredError(),
metrics=[tf.keras.metrics.RootMeanSquaredError()])
def call(self, features: Dict[str, tf.Tensor]) -> tf.Tensor:
return self.ranking_model((features['userId'], features['movieId']))
def compute_loss(self,
features: Dict[Text, tf.Tensor],
training=False) -> tf.Tensor:
labels = features[1]
rating_predictions = self(features[0])
# The task computes the loss and the metrics.
return self.task(labels=labels, predictions=rating_predictions)
def _input_fn(file_pattern: List[str],
data_accessor: tfx.components.DataAccessor,
schema: schema_pb2.Schema,
batch_size: int = 256) -> tf.data.Dataset:
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
schema=schema).repeat()
def _build_keras_model() -> tf.keras.Model:
return MovielensModel()
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
train_dataset = _input_fn(
fn_args.train_files, fn_args.data_accessor, schema, batch_size=8192)
eval_dataset = _input_fn(
fn_args.eval_files, fn_args.data_accessor, schema, batch_size=4096)
model = _build_keras_model()
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
epochs = 3,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
model.save(fn_args.serving_model_dir)
Writing tfrs_ranking_trainer.py
Now you have completed all preparation steps to build the TFX pipeline.
Write a pipeline definition
We define a function to create a TFX pipeline. A Pipeline
object
represents a TFX pipeline which can be run using one of pipeline
orchestration systems that TFX supports.
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
module_file: str, serving_model_dir: str,
metadata_path: str) -> tfx.dsl.Pipeline:
"""Creates a three component pipeline with TFX."""
# Brings data into the pipeline.
example_gen = tfx.components.CsvExampleGen(input_base=data_root)
# Uses user-provided Python function that trains a model.
trainer = tfx.components.Trainer(
module_file=module_file,
examples=example_gen.outputs['examples'],
train_args=tfx.proto.TrainArgs(num_steps=12),
eval_args=tfx.proto.EvalArgs(num_steps=24))
# Pushes the model to a filesystem destination.
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=serving_model_dir)))
# Following three components will be included in the pipeline.
components = [
example_gen,
trainer,
pusher,
]
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
metadata_connection_config=tfx.orchestration.metadata
.sqlite_metadata_connection_config(metadata_path),
components=components)
Run the pipeline
TFX supports multiple orchestrators to run pipelines.
In this tutorial we will use LocalDagRunner
which is included in the TFX
Python package and runs pipelines on local environment.
Now we create a LocalDagRunner
and pass a Pipeline
object created from the
function we already defined.
The pipeline runs directly and you can see logs for the progress of the pipeline including ML model training.
tfx.orchestration.LocalDagRunner().run(
_create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
data_root=DATA_ROOT,
module_file=_trainer_module_file,
serving_model_dir=SERVING_MODEL_DIR,
metadata_path=METADATA_PATH))
INFO:absl:Generating ephemeral wheel package for '/tmpfs/src/temp/docs/examples/tfrs_ranking_trainer.py' (including modules: ['tfrs_ranking_trainer']). INFO:absl:User module package has hash fingerprint version bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c. INFO:absl:Executing: ['/tmpfs/src/tf_docs_env/bin/python', '/tmpfs/tmp/tmpbks2_kfl/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmpfs/tmp/tmpbvugqpwn', '--dist-dir', '/tmpfs/tmp/tmpfqmceeau'] /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/setuptools/command/install.py:34: SetuptoolsDeprecationWarning: setup.py install is deprecated. Use build and pip and other standards-based tools. warnings.warn( INFO:absl:Successfully built user code wheel distribution at 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'; target user module is 'tfrs_ranking_trainer'. INFO:absl:Full user module path is 'tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl' INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "Pusher" value { python_class_executable_spec { class_path: "tfx.components.pusher.executor.Executor" } } } executor_specs { key: "Trainer" value { python_class_executable_spec { class_path: "tfx.components.trainer.executor.GenericExecutor" } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "metadata/TFRS-ranking/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "metadata/TFRS-ranking/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "data/TFRS-ranking" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "Trainer" execution_options { caching_options { } } running bdist_wheel running build running build_py creating build creating build/lib copying tfrs_ranking_trainer.py -> build/lib installing to /tmpfs/tmp/tmpbvugqpwn running install running install_lib copying build/lib/tfrs_ranking_trainer.py -> /tmpfs/tmp/tmpbvugqpwn running install_egg_info running egg_info creating tfx_user_code_Trainer.egg-info writing tfx_user_code_Trainer.egg-info/PKG-INFO writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt' reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt' writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt' Copying tfx_user_code_Trainer.egg-info to /tmpfs/tmp/tmpbvugqpwn/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3.9.egg-info running install_scripts creating /tmpfs/tmp/tmpbvugqpwn/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/WHEEL creating '/tmpfs/tmp/tmpfqmceeau/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl' and adding '/tmpfs/tmp/tmpbvugqpwn' to it adding 'tfrs_ranking_trainer.py' adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/METADATA' adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/WHEEL' adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/top_level.txt' adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/RECORD' removing /tmpfs/tmp/tmpbvugqpwn INFO:absl:MetadataStore with DB connection initialized INFO:absl:[CsvExampleGen] Resolved inputs: ({},) INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 1 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'output_file_format': 5, 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_data_format': 6, 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'input_base': 'data/TFRS-ranking', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027'}, execution_output_uri='pipelines/TFRS-ranking/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='pipelines/TFRS-ranking/CsvExampleGen/.system/stateful_working_dir/2022-12-14T13:03:48.504573', tmp_dir='pipelines/TFRS-ranking/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "data/TFRS-ranking" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "Trainer" execution_options { caching_options { } } , pipeline_info=id: "TFRS-ranking" , pipeline_run_id='2022-12-14T13:03:48.504573') INFO:absl:Generating examples. WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features. INFO:absl:Processing input csv data data/TFRS-ranking/* to TFExample. WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 1 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 1 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component Trainer is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.trainer.component.Trainer" base_type: TRAIN } id: "Trainer" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Trainer" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } context_queries { type { name: "node" } name { field_value { string_value: "TFRS-ranking.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "model" value { artifact_spec { type { name: "Model" base_type: MODEL } } } } outputs { key: "model_run" value { artifact_spec { type { name: "ModelRun" } } } } } parameters { parameters { key: "custom_config" value { field_value { string_value: "null" } } } parameters { key: "eval_args" value { field_value { string_value: "{\n \"num_steps\": 24\n}" } } } parameters { key: "module_path" value { field_value { string_value: "tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl" } } } parameters { key: "train_args" value { field_value { string_value: "{\n \"num_steps\": 12\n}" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "Pusher" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized WARNING:absl:ArtifactQuery.property_predicate is not supported. INFO:absl:[Trainer] Resolved inputs: ({'examples': [Artifact(artifact: id: 1 type_id: 15 uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "state" value { string_value: "published" } } custom_properties { key: "tfx_version" value { string_value: "1.11.0" } } state: LIVE create_time_since_epoch: 1671023043350 last_update_time_since_epoch: 1671023043350 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]},) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 2 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1 type_id: 15 uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "state" value { string_value: "published" } } custom_properties { key: "tfx_version" value { string_value: "1.11.0" } } state: LIVE create_time_since_epoch: 1671023043350 last_update_time_since_epoch: 1671023043350 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model_run/2" , artifact_type: name: "ModelRun" )], 'model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model/2" , artifact_type: name: "Model" base_type: MODEL )]}), exec_properties={'train_args': '{\n "num_steps": 12\n}', 'eval_args': '{\n "num_steps": 24\n}', 'custom_config': 'null', 'module_path': 'tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'}, execution_output_uri='pipelines/TFRS-ranking/Trainer/.system/executor_execution/2/executor_output.pb', stateful_working_dir='pipelines/TFRS-ranking/Trainer/.system/stateful_working_dir/2022-12-14T13:03:48.504573', tmp_dir='pipelines/TFRS-ranking/Trainer/.system/executor_execution/2/.temp/', pipeline_node=node_info { type { name: "tfx.components.trainer.component.Trainer" base_type: TRAIN } id: "Trainer" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Trainer" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } context_queries { type { name: "node" } name { field_value { string_value: "TFRS-ranking.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "model" value { artifact_spec { type { name: "Model" base_type: MODEL } } } } outputs { key: "model_run" value { artifact_spec { type { name: "ModelRun" } } } } } parameters { parameters { key: "custom_config" value { field_value { string_value: "null" } } } parameters { key: "eval_args" value { field_value { string_value: "{\n \"num_steps\": 24\n}" } } } parameters { key: "module_path" value { field_value { string_value: "tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl" } } } parameters { key: "train_args" value { field_value { string_value: "{\n \"num_steps\": 12\n}" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "Pusher" execution_options { caching_options { } } , pipeline_info=id: "TFRS-ranking" , pipeline_run_id='2022-12-14T13:03:48.504573') INFO:absl:Train on the 'train' split when train_args.splits is not set. INFO:absl:Evaluate on the 'eval' split when eval_args.splits is not set. INFO:absl:udf_utils.get_fn {'train_args': '{\n "num_steps": 12\n}', 'eval_args': '{\n "num_steps": 24\n}', 'custom_config': 'null', 'module_path': 'tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'} 'run_fn' INFO:absl:Installing 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl' to a temporary directory. INFO:absl:Executing: ['/tmpfs/src/tf_docs_env/bin/python', '-m', 'pip', 'install', '--target', '/tmpfs/tmp/tmpkwsj4cmv', 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'] Processing ./pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl INFO:absl:Successfully installed 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'. INFO:absl:Training model. INFO:absl:Feature movieId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature rating has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature userId has a shape dim { size: 1 } . Setting to DenseTensor. Installing collected packages: tfx-user-code-Trainer Successfully installed tfx-user-code-Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c INFO:absl:Feature movieId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature rating has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature userId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature movieId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature rating has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature userId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature movieId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature rating has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature userId has a shape dim { size: 1 } . Setting to DenseTensor. Epoch 1/3 12/12 [==============================] - 4s 247ms/step - root_mean_squared_error: 1.9485 - loss: 3.6361 - regularization_loss: 0.0000e+00 - total_loss: 3.6361 - val_root_mean_squared_error: 1.2136 - val_loss: 1.4467 - val_regularization_loss: 0.0000e+00 - val_total_loss: 1.4467 Epoch 2/3 12/12 [==============================] - 2s 198ms/step - root_mean_squared_error: 1.1657 - loss: 1.3525 - regularization_loss: 0.0000e+00 - total_loss: 1.3525 - val_root_mean_squared_error: 1.1173 - val_loss: 1.2615 - val_regularization_loss: 0.0000e+00 - val_total_loss: 1.2615 Epoch 3/3 12/12 [==============================] - 2s 212ms/step - root_mean_squared_error: 1.1121 - loss: 1.2330 - regularization_loss: 0.0000e+00 - total_loss: 1.2330 - val_root_mean_squared_error: 1.0983 - val_loss: 1.2303 - val_regularization_loss: 0.0000e+00 - val_total_loss: 1.2303 WARNING:absl:Function `_wrapped_model` contains input name(s) movieId, userId with unsupported characters which will be renamed to movieid, userid in the SavedModel. WARNING:absl:Found untraced functions such as ranking_layer_call_fn, ranking_layer_call_and_return_conditional_losses while saving (showing 2 of 2). These functions will not be directly callable after loading. INFO:tensorflow:Assets written to: pipelines/TFRS-ranking/Trainer/model/2/Format-Serving/assets INFO:tensorflow:Assets written to: pipelines/TFRS-ranking/Trainer/model/2/Format-Serving/assets INFO:absl:Training complete. Model written to pipelines/TFRS-ranking/Trainer/model/2/Format-Serving. ModelRun written to pipelines/TFRS-ranking/Trainer/model_run/2 INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 2 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model_run/2" , artifact_type: name: "ModelRun" )], 'model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model/2" , artifact_type: name: "Model" base_type: MODEL )]}) for execution 2 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component Trainer is finished. INFO:absl:Component Pusher is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.pusher.component.Pusher" base_type: DEPLOY } id: "Pusher" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Pusher" } } } } inputs { inputs { key: "model" value { channels { producer_node_query { id: "Trainer" } context_queries { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } context_queries { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Trainer" } } } artifact_query { type { name: "Model" base_type: MODEL } } output_key: "model" } } } } outputs { outputs { key: "pushed_model" value { artifact_spec { type { name: "PushedModel" base_type: MODEL } } } } } parameters { parameters { key: "custom_config" value { field_value { string_value: "null" } } } parameters { key: "push_destination" value { field_value { string_value: "{\n \"filesystem\": {\n \"base_directory\": \"serving_model/TFRS-ranking\"\n }\n}" } } } } upstream_nodes: "Trainer" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized WARNING:absl:ArtifactQuery.property_predicate is not supported. INFO:absl:[Pusher] Resolved inputs: ({'model': [Artifact(artifact: id: 3 type_id: 18 uri: "pipelines/TFRS-ranking/Trainer/model/2" custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "state" value { string_value: "published" } } custom_properties { key: "tfx_version" value { string_value: "1.11.0" } } state: LIVE create_time_since_epoch: 1671023059840 last_update_time_since_epoch: 1671023059840 , artifact_type: id: 18 name: "Model" base_type: MODEL )]},) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 3 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'model': [Artifact(artifact: id: 3 type_id: 18 uri: "pipelines/TFRS-ranking/Trainer/model/2" custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "state" value { string_value: "published" } } custom_properties { key: "tfx_version" value { string_value: "1.11.0" } } state: LIVE create_time_since_epoch: 1671023059840 last_update_time_since_epoch: 1671023059840 , artifact_type: id: 18 name: "Model" base_type: MODEL )]}, output_dict=defaultdict(<class 'list'>, {'pushed_model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Pusher/pushed_model/3" , artifact_type: name: "PushedModel" base_type: MODEL )]}), exec_properties={'push_destination': '{\n "filesystem": {\n "base_directory": "serving_model/TFRS-ranking"\n }\n}', 'custom_config': 'null'}, execution_output_uri='pipelines/TFRS-ranking/Pusher/.system/executor_execution/3/executor_output.pb', stateful_working_dir='pipelines/TFRS-ranking/Pusher/.system/stateful_working_dir/2022-12-14T13:03:48.504573', tmp_dir='pipelines/TFRS-ranking/Pusher/.system/executor_execution/3/.temp/', pipeline_node=node_info { type { name: "tfx.components.pusher.component.Pusher" base_type: DEPLOY } id: "Pusher" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Pusher" } } } } inputs { inputs { key: "model" value { channels { producer_node_query { id: "Trainer" } context_queries { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } context_queries { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Trainer" } } } artifact_query { type { name: "Model" base_type: MODEL } } output_key: "model" } } } } outputs { outputs { key: "pushed_model" value { artifact_spec { type { name: "PushedModel" base_type: MODEL } } } } } parameters { parameters { key: "custom_config" value { field_value { string_value: "null" } } } parameters { key: "push_destination" value { field_value { string_value: "{\n \"filesystem\": {\n \"base_directory\": \"serving_model/TFRS-ranking\"\n }\n}" } } } } upstream_nodes: "Trainer" execution_options { caching_options { } } , pipeline_info=id: "TFRS-ranking" , pipeline_run_id='2022-12-14T13:03:48.504573') WARNING:absl:Pusher is going to push the model without validation. Consider using Evaluator or InfraValidator in your pipeline. INFO:absl:Model version: 1671023059 INFO:absl:Model written to serving path serving_model/TFRS-ranking/1671023059. INFO:absl:Model pushed to pipelines/TFRS-ranking/Pusher/pushed_model/3. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 3 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'pushed_model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Pusher/pushed_model/3" , artifact_type: name: "PushedModel" base_type: MODEL )]}) for execution 3 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component Pusher is finished.
You should see "INFO:absl:Component Pusher is finished." at the end of the
logs if the pipeline finished successfully. Because Pusher
component is the
last component of the pipeline.
The pusher component pushes the trained model to the SERVING_MODEL_DIR
which
is the serving_model/TFRS-ranking
directory if you did not change the
variables in the previous steps. You can see the result from the file browser
in the left-side panel in Colab, or using the following command:
# List files in created model directory.
ls -R {SERVING_MODEL_DIR}
serving_model/TFRS-ranking: 1671023059 serving_model/TFRS-ranking/1671023059: assets keras_metadata.pb saved_model.pb variables serving_model/TFRS-ranking/1671023059/assets: serving_model/TFRS-ranking/1671023059/variables: variables.data-00000-of-00001 variables.index
Now we can test the ranking model by computing predictions for a user and a movie:
import glob
# Load the latest model for testing
loaded = tf.saved_model.load(max(glob.glob(os.path.join(SERVING_MODEL_DIR, '*/')), key=os.path.getmtime))
print(loaded({'userId': [[42]], 'movieId': [[15]]}).numpy())
[[[3.6946948]]]
This concludes the TensorFlow Recommenders + TFX tutorial.