View source on GitHub |
A Keras layer for accelerating embedding lookups for large tables with TPU.
tfrs.layers.embedding.TPUEmbedding(
feature_config: Union[tf.tpu.experimental.embedding.FeatureConfig, Iterable],
optimizer: Optional[Union[tf.tpu.experimental.embedding.SGD, tf.tpu.experimental.
embedding.Adagrad, tf.tpu.experimental.embedding.Adam, tf.tpu.
experimental.embedding.FTRL]],
pipeline_execution_with_tensor_core: bool = False,
batch_size: Optional[int] = None,
embedding_feature: Optional[tf.tpu.experimental.HardwareFeature.EmbeddingFeature] = None
)
Used in the notebooks
Used in the guide | Used in the tutorials |
---|---|
Feature and table configuration
When creating an instance of this layer, you must specify:
- The complete set of embedding tables,
- The features you expect to lookup in those tables and
- The optimizer(s) you wish to use on the tables.
See the documentation of tf.tpu.experimental.embedding.TableConfig
and
tf.tpu.experimental.embedding.FeatureConfig
for more details on the complete
set of options. We will cover the basic usage here.
table_config_one = tf.tpu.experimental.embedding.TableConfig(
vocabulary_size=...,
dim=...)
table_config_two = tf.tpu.experimental.embedding.TableConfig(
vocabulary_size=...,
dim=...)
feature_config = {
'feature_one': tf.tpu.experimental.embedding.FeatureConfig(
table=table_config_one),
'feature_two': tf.tpu.experimental.embedding.FeatureConfig(
table=table_config_one),
'feature_three': tf.tpu.experimental.embedding.FeatureConfig(
table=table_config_two)}
Optimizers
An optimizer can be globally specified by passing one of the following types of input to the optimizer argument:
- A string, one of 'sgd', 'adagrad' or 'adam', which uses the given optimizer with the default parameters.
- An instance of a Keras optimizer.
- An instance of an optimizer class from the
tf.tpu.experimental.embedding
module.
You may also specify an optimizer as the table level via the optimizer
argument of tf.tpu.experimental.embedding.TableConfig
. This will completely
override the global optimizer for this table. For performance reasons it is
recommended that you minimize the total number of distinct optimizers.
Dynamic Learning Rate
Using a dynamic learning rate is supported for all optimizers, all other hyper parameters are static. There are two ways of specifying a dynamic learning rate in your optimizer:
- One of the objects in the
tf.keras.optimizers.schedules
name space. - A python callable taking no parameters which returns a scalar tensor of
type
tf.float32
.
tf.keras.optimizers.schedules
This method of specifying a learning schedule is only possible when using a
Keras optimizer. In this case, set the learning rate of the optimizer to your
desired tf.keras.optimizers.schedules
object.
Callable
This method can be used if you use a Keras optimizer or one of the optimizer
classes in the tf.tpu.experimental.embedding
namespace.
In either case you should create a callable function that returns a tensor.
This function will be called once, but the ops it generates will be
reevaluated each step. Thus it is recommended that you either create a
tf.Variable
representing your current step counter or use the iterations
property of an optimizer you call apply_gradients
on each trianing step.
with strategy.scope():
step = tf.Variable(
initial_value=0, trainable=False, dtype=tf.int64,
aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA)
Model creation
For a functional style Keras model:
strategy = tf.distribute.TPUStrategy(...)
with strategy.scope():
embedding_inputs = {
'feature_one': tf.keras.Input(batch_size=1024, shape=(),
dtype=tf.int32),
'feature_two': tf.keras.Input(batch_size=1024, shape=(),
dtype=tf.int32, ragged=True),
'feature_three': tf.keras.Input(batch_size=1024, shape=(),
dtype=tf.int32)}
# embedding, feature_config and embedding_inputs all have the same nested
# structure.
embedding = tpu_embedding_layer.TPUEmbedding(
feature_config=feature_config,
optimizer=tf.tpu.experimental.embedding.SGD(0.1))(embedding_inputs)
logits = tf.keras.layers.Dense(1)(tf.concat(tf.nest.flatten(embedding)))
model = tf.keras.Model(embedding_inputs, logits)
For a subclass style model:
class ModelWithEmbeddings(tf.keras.Model):
def __init__(self):
self.embedding_layer = tpu_embedding_layer.TPUEmbedding(
feature_config=feature_config,
optimizer=tf.tpu.experimental.embedding.SGD(0.1))
def call(self, inputs):
embedding = self.embedding_layer(inputs)
logits = tf.keras.layers.Dense(1)(tf.concat(tf.nest.flatten(embedding)))
with strategy.scope():
model = ModelWithEmbeddings()
Input data
When creating a distributed dataset that is to be passed to be used with a
model that contains a TPUEmbedding layer, a special option must be specified
when calling any of the dataset distribution methods of TPUStrategy
:
distributed_dataset = (
strategy.distribute_datasets_from_function(
dataset_fn=...,
options=tf.distribute.InputOptions(
experimental_fetch_to_device=False))
dataset_iterator = iter(distributed_dataset)
Different feature inputs can have different shapes. For dense and sparse tensor, rank 2 and above is supported. For ragged tensor, although only rank 2 is supported, you can specify the output shape to be rank 2 and above. The output shape specified in the FeatureConfig has the first priority. The input shape passed in build method has second priority and the input shapes auto detected from input feature has the lowest priority. The latter two will be converted to output shapes by omitting the last dimension. If the lower priority one has output shapes which don't match the former one. A ValueError will be raised. Only when the former one has undefined output shapes, the latter one can override.
Training and evaluation
To use this API on TPU you should use a custom training loop. Below is an example of a training and evaluation step:
@tf.function
def training_step(dataset_iterator, num_steps):
def tpu_step(inputs):
labels, features = inputs
with tf.GradientTape() as tape:
model_output = model(features)
loss = ... # some function of labels and model_output
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
for _ in tf.range(num_steps):
strategy.run(tpu_step, args=(next(dataset_iterator), ))
@tf.function
def evaluation_step(dataset_iterator, num_steps):
def tpu_step(inputs):
labels, features = inputs
model_output = model(features)
# Insert your evaluation code here.
for _ in tf.range(num_steps):
strategy.run(tpu_step, args=(next(dataset_iterator), ))
In the above examples, we assume that the user has a dataset which returns
a tuple where the second element of the tuple matches the structure of what
was passed as the feature_config
argument to the object initializer. Also we
utilize tf.range
to get a tf.while_loop
in order to increase performance.
The embedding layer does not affect checkpointing; simply checkpoint your
model as normal, remembering that if you passed either a Keras optimizer or an
optimizer converted from a Keras optimizer via translate_keras_optimizer
you
must checkpoint the optimizer to ensure that your slot variables are saved.
checkpoint = tf.train.Checkpoint(model=model)
checkpoint.save(...)
Serving
Serving is accomplished through the tf.saved_model
API. The model may be
exported directly from training.
First we write a tf.function
that represents the serving graph. Typically
this may take as input a string tensor containing protos that are parsed into
tensors and then passed to the model. I.e.
@tf.function(input_signature=[{
'examples':
tf.TensorSpec(
shape=[None], dtype=tf.string, name='examples')}])
def serve_examples(examples):
input_data = ... # parse the examples tensor to produce input tensors.
return model(input_data)
tf.saved_model.save(model,
export_dir=...,
signatures={'serving': serve_examples})
The exported model can now be loaded (in python or c) and used for serving:
imported = tf.saved_model.load(...)
predict_fn = imported.signatures['serving']
predict_fn(...)
Using this layer on CPU
This layer can also be instantiated under a CPU strategy and used for local
testing/training. The model created in such a way are checkpoint compatible
with models created under TPUStrategy
. In order to achieve checkpoint
compatibility, you must use a Keras optimizers (or ones converted by
translate_keras_optimizer
) as your optimizers.
In the simplest case, where you use the same optimizer for your embedding and
dense layers, the training_step
above will function exactly the same in both
situations.
If you use a separate Keras optimizer for your embedding layers (e.g. you want a different hyper parameter setting or an entirely different algorithm), special care must be observed to keep things the same. To understand why, there are a few technical details you need to know:
When created under TPUStrategy
the underlying table variables are not
considered trainable and are not available under model.trainable_variables
.
The main reason for this is that the table variables are just a stand-in for
the real data which lives in the HBM of the TPU. These variables are stale and
are only updated when saving and restoring checkpoints.
Because of this a standard optimizer.apply_gradient
will not work on these
variables. Instead a separate virtual trainable variable is added to the list
of trainable variables and simply computing the gradient of this variable will
cause the gradient for the embeddings to be computed and the optimizer
applied.
When created under a CPU strategy, the table variables are created normally and are part of the model's trainiable variables. In this case, if you are using a different optimizer to embedding tables, you must manually partition the variables and gradients so that you can use the Keras optmizer you created for embedding tables on the tables.
E.g.,
class ModelWithSeparateOptimizer(tf.keras.Model):
def __init__(self, optimizer):
self.embedding_layer = tpu_embedding_layer.TPUEmbedding(
feature_config=feature_config,
optimizer=optimizer)
def call(self, inputs):
embedding = self.embedding_layer(inputs)
logits = tf.keras.layers.Dense(1)(tf.concat(tf.nest.flatten(embedding)))
with strategy.scope():
embedding_optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.1)
dense_optimizer = tf.keras.optimizers.Adam(learning_rate=0.1)
model = ModelWithSeparateOptimizer(embedding_optimizer)
@tf.function
def training_step(dataset_iterator, num_steps):
def tpu_step(inputs):
labels, features = inputs
with tf.GradientTape() as tape:
model_output = model(features)
loss = ... # some function of labels and model_output
gradients = tape.gradient(loss, model.trainable_variables)
grads_and_vars = zip(gradients, model.trainable_variables)
# Note the use of 'id' here: 'x in y' uses x's equality method and if x is
# a tensor this tf.math.equal rather than python object equality.
embedding_var_ids = [
id(v) for v in model.embedding_layer.trainable_variables]
dense_grads_and_vars = [
(g, v) for g, v in grads_and_vars
if id(v) not in embedding_var_ids]
dense_optimizer.apply_gradients(dense_grads_and_vars)
embedding_grads_and_vars = [
(g, v) for g, v in grads_and_vars
if id(v) in embedding_var_ids]
embedding_optimizer.apply_gradients(embedding_grads_and_vars)
for _ in tf.range(num_steps):
strategy.run(tpu_step, args=(next(dataset_iterator), ))
The above training step works both on TPU and on CPU.
Using this layer on TPU without embedding lookup accelerator.
This layer can also be initialized under TPUs without embedding lookup accelerators. There is no change required to the client code as the layer can auto switch between different mid level APIs based on the TPU hardware. You can also force the layer to run without acceleration by overriding the embedding feature to "UNSUPPORTED". This might be helpful when your table is relatively small.
Note that instead of sharding the table across devices, the table will be replicated across them.
Args | |
---|---|
feature_config
|
A nested structure of
tf.tpu.experimental.embedding.FeatureConfig configs.
|
optimizer
|
An instance of one of tf.tpu.experimental.embedding.SGD ,
tf.tpu.experimental.embedding.Adagrad or
tf.tpu.experimental.embedding.Adam , a Keras optimizer or a string name
of an optimizer (see tf.keras.optimizers.get ). Or, if not created
under a TPU strategy, None, which will avoid creation of the optimizer
slot variable do reduce memory consumption during export.
|
pipeline_execution_with_tensor_core
|
If True, the TPU embedding computations will overlap with the TensorCore computations (and hence will be one step old with potential correctness drawbacks). Set to True for improved performance. |
batch_size
|
Batch size of the input feature. Deprecated, support backward compatibility. |
embedding_feature
|
EmbeddingFeature enum, inidicating which version of TPU hardware the layer should run on. |
Methods
call
call(
features: Any,
weights: Optional[Any] = None,
serving_config: Optional[Union[tf.tpu.experimental.embedding.FeatureConfig, Iterable]] = None
) -> Any
Look up features in the embedding tables and combine using weights.
Args | ||
---|---|---|
features
|
a nested structure of Tensor s, SparseTensor s or
RaggedTensor s with the same structure as feature_config . These
tensors are used as ids to lookup rows in the embedding tables using the
config as specified in the corresponding entry of feature_config . You
can mix Tensor s and SparseTensor s, or Tensor s and RaggedTensor s,
but not SparseTensor s and RaggedTensor s.
|
|
weights
|
None, or a nested structure of Tensors, SparseTensors or RaggedTensors or None matching features. These are the weights used
when combining the looked up rows for a given feature and examples. If
None, weights of 1 will be used.
</td>
</tr><tr>
<td> serving_config`
|
A nested structure of
tf.tpu.experimental.embedding.FeatureConfig objects. If not None, this
layer uses CPU based lookup using serving_config and the current set of
embedding tables.
|
Returns | |
---|---|
The combined embedding activations for the input ids passed in via features. |
Raises | |
---|---|
RuntimeError
|
If layer is not created under a TPU strategy and is called under a TPU strategy. |