Introducción al canal de clasificación de TensorFlow

TL;DR : Reduzca el código repetitivo para crear, entrenar y servir modelos de TensorFlow Ranking con TensorFlow Ranking Pipelines; Use estrategias distribuidas adecuadas para aplicaciones de clasificación a gran escala dado el caso de uso y los recursos.

Introducción

TensorFlow Ranking Pipeline consta de una serie de procesos de procesamiento de datos, construcción de modelos, capacitación y servicio que le permite construir, entrenar y brindar modelos de clasificación escalables basados ​​en redes neuronales a partir de registros de datos con un esfuerzo mínimo. La canalización es más eficiente cuando el sistema se amplía. En general, si su modelo tarda 10 minutos o más en ejecutarse en una sola máquina, considere usar este marco de canalización para distribuir la carga y acelerar el procesamiento.

TensorFlow Ranking Pipeline se ha ejecutado de manera constante y estable en experimentos y producciones a gran escala con big data (terabytes+) y modelos grandes (100M+ de FLOP) en sistemas distribuidos (1K+ CPU y 100+ GPU y TPU). Una vez que se prueba un modelo de TensorFlow con model.fit en una pequeña parte de los datos, se recomienda la canalización para el escaneo de hiperparámetros, el entrenamiento continuo y otras situaciones a gran escala.

Tubería de clasificación

En TensorFlow, una tubería típica para construir, entrenar y servir un modelo de clasificación incluye los siguientes pasos típicos.

  • Defina la estructura del modelo:
    • Crear entradas;
    • Crear capas de preprocesamiento;
    • Crear arquitectura de red neuronal;
  • Modelo de tren:
    • Genere conjuntos de datos de entrenamiento y validación a partir de registros de datos;
    • Prepare el modelo con los hiperparámetros adecuados:
      • Optimizador;
      • Pérdidas de clasificación;
      • métricas de clasificación;
    • Configure estrategias distribuidas para entrenar en múltiples dispositivos.
    • Configure las devoluciones de llamada para varios libros de contabilidad.
    • Modelo de exportación para servir;
  • Modelo de servicio:
    • Determinar el formato de datos al momento de servir;
    • Elija y cargue el modelo entrenado;
    • Proceso con modelo cargado.

Uno de los principales objetivos de la canalización TensorFlow Ranking es reducir el código repetitivo en los pasos, como la carga y el preprocesamiento de conjuntos de datos, la compatibilidad de los datos por lista y la función de puntuación por puntos, y la exportación de modelos. El otro objetivo importante es hacer cumplir el diseño consistente de muchos procesos inherentemente correlacionados, por ejemplo, las entradas del modelo deben ser compatibles tanto con los conjuntos de datos de entrenamiento como con el formato de los datos en servicio.

Guía de uso

Con todo el diseño anterior, el lanzamiento de un modelo de clasificación TF se divide en los siguientes pasos, como se muestra en la Figura 1.

Diagrama de canal de clasificación de TensorFlow
Figura 1 : diagrama de las clases de clasificación de TensorFlow y los pasos para entrenar modelos de clasificación con la canalización de clasificación de TF. Los módulos verdes se pueden personalizar para su modelo de clasificación.

Ejemplo usando una red neuronal distribuida

En este ejemplo, aprovechará los tfr.keras.model.FeatureSpecInputCreator , tfr.keras.pipeline.SimpleDatasetBuilder y tfr.keras.pipeline.SimplePipeline que toman en feature_spec s para definir consistentemente las características de entrada en las entradas del modelo y servidor de conjuntos de datos. La versión de cuaderno con un tutorial paso a paso se puede encontrar en el tutorial de clasificación distribuida .

Primero defina feature_spec s para las características de contexto y de ejemplo.

context_feature_spec = {}
example_feature_spec = {
    'custom_features_{}'.format(i + 1):
    tf.io.FixedLenFeature(shape=(1,), dtype=tf.float32, default_value=0.0)
    for i in range(10)
}
label_spec = ('utility', tf.io.FixedLenFeature(
    shape=(1,), dtype=tf.float32, default_value=-1))

Siga los pasos ilustrados en la Figura 1:
Defina input_creator a partir de feature_spec s.

input_creator = tfr.keras.model.FeatureSpecInputCreator(
    context_feature_spec, example_feature_spec)

Luego defina transformaciones de características de preprocesamiento para el mismo conjunto de características de entrada.

def log1p(tensor):
    return tf.math.log1p(tensor * tf.sign(tensor)) * tf.sign(tensor)
preprocessor = {
    'custom_features_{}'.format(i + 1): log1p
    for i in range(10)
}

Defina el anotador con el modelo DNN feedforward incorporado.

dnn_scorer = tfr.keras.model.DNNScorer(
    hidden_layer_dims=[1024, 512, 256],
    output_units=1,
    activation=tf.nn.relu,
    use_batch_norm=True,
    batch_norm_moment=0.99,
    dropout=0.4)

Cree el model_builder con input_creator , preprocessor y scorer .

model_builder = tfr.keras.model.ModelBuilder(
    input_creator=input_creator,
    preprocessor=preprocessor,
    scorer=dnn_scorer,
    mask_feature_name='__list_mask__',
    name='web30k_dnn_model')

Ahora configure los hiperparámetros para dataset_builder .

dataset_hparams = tfr.keras.pipeline.DatasetHparams(
    train_input_pattern='/path/to/MSLR-WEB30K-ELWC/train-*',
    valid_input_pattern='/path/to/MSLR-WEB30K-ELWC/vali-*',
    train_batch_size=128,
    valid_batch_size=128,
    list_size=200,
    dataset_reader=tf.data.RecordIODataset,
    convert_labels_to_binary=False)

Haz el dataset_builder .

tfr.keras.pipeline.SimpleDatasetBuilder(
    context_feature_spec=context_feature_spec,
    example_feature_spec=example_feature_spec,
    mask_feature_name='__list_mask__',
    label_spec=label_spec,
    hparams=dataset_hparams)

También establezca los hiperparámetros para la canalización.

pipeline_hparams = tfr.keras.pipeline.PipelineHparams(
    model_dir='/tmp/web30k_dnn_model',
    num_epochs=100,
    num_train_steps=100000,
    num_valid_steps=100,
    loss='softmax_loss',
    loss_reduction=tf.losses.Reduction.AUTO,
    optimizer='adam',
    learning_rate=0.0001,
    steps_per_execution=100,
    export_best_model=True,
    strategy='MirroredStrategy',
    tpu=None)

Haz el ranking_pipeline y entrena.

ranking_pipeline = tfr.keras.pipeline.SimplePipeline(
    model_builder=model_builder,
    dataset_builder=dataset_builder,
    hparams=pipeline_hparams,
)
ranking_pipeline.train_and_validate()

Diseño de canalización de clasificación de TensorFlow

TensorFlow Ranking Pipeline ayuda a ahorrar tiempo de ingeniería con el código repetitivo y, al mismo tiempo, permite la flexibilidad de personalización a través de la anulación y la subclasificación. Para lograr esto, la canalización presenta clases personalizables tfr.keras.model.AbstractModelBuilder , tfr.keras.pipeline.AbstractDatasetBuilder y tfr.keras.pipeline.AbstractPipeline para configurar la canalización TensorFlow Ranking.

Diseño de clases de canalización de clasificación de TensorFlow
Figura 2 : diseño general de las clases de canalización de clasificación de TensorFlow.

ModelBuilder

El código repetitivo relacionado con la construcción del modelo de Keras está integrado en AbstractModelBuilder , que se pasa a AbstractPipeline y se llama dentro de la tubería para construir el modelo bajo el alcance de la estrategia. Esto se muestra en la Figura 1. Los métodos de clase se definen en la clase base abstracta.

class AbstractModelBuilder:
  def __init__(self, mask_feature_name, name):

  @abstractmethod
  def create_inputs(self):
    // To create tf.keras.Input. Abstract method, to be overridden.
    ...
  @abstractmethod
  def preprocess(self, context_inputs, example_inputs, mask):
    // To preprocess input features. Abstract method, to be overridden.
    ...
  @abstractmethod
  def score(self, context_features, example_features, mask):
    // To score based on preprocessed features. Abstract method, to be overridden.
    ...
  def build(self):
    context_inputs, example_inputs, mask = self.create_inputs()
    context_features, example_features = self.preprocess(
        context_inputs, example_inputs, mask)
    logits = self.score(context_features, example_features, mask)
    return tf.keras.Model(inputs=..., outputs=logits, name=self._name)

Puede subclasificar directamente AbstractModelBuilder y sobrescribir con los métodos concretos para la personalización, como

class MyModelBuilder(AbstractModelBuilder):
  def create_inputs(self, ...):
  ...

Al mismo tiempo, debe usar ModelBuilder con características de entrada, transformaciones de preprocesamiento y funciones de puntuación especificadas como entradas de función input_creator , preprocessor y scorer en la clase init en lugar de subclases.

class ModelBuilder(AbstractModelBuilder):
  def __init__(self, input_creator, preprocessor, scorer, mask_feature_name, name):
  ...

Para reducir los repetitivos de la creación de estas entradas, se proporcionan las clases de funciones tfr.keras.model.InputCreator para input_creator , tfr.keras.model.Preprocessor para preprocessor y tfr.keras.model.Scorer para scorer , junto con subclases concretas tfr.keras.model.FeatureSpecInputCreator , tfr.keras.model.TypeSpecInputCreator , tfr.keras.model.PreprocessorWithSpec , tfr.keras.model.UnivariateScorer , tfr.keras.model.DNNScorer y tfr.keras.model.GAMScorer . Estos deberían cubrir la mayoría de los casos de uso comunes.

Tenga en cuenta que estas clases de funciones son clases de Keras, por lo que no es necesaria la serialización. La creación de subclases es la forma recomendada de personalizarlos.

Generador de conjuntos de datos

La clase DatasetBuilder recopila repeticiones relacionadas con conjuntos de datos. Los datos se pasan a Pipeline y se llaman para servir los conjuntos de datos de entrenamiento y validación y para definir las firmas de servicio para los modelos guardados. Como se muestra en la Figura 1, los métodos de DatasetBuilder se definen en la clase base tfr.keras.pipeline.AbstractDatasetBuilder ,

class AbstractDatasetBuilder:

  @abstractmethod
  def build_train_dataset(self, *arg, **kwargs):
    // To return the training dataset.
    ...
  @abstractmethod
  def build_valid_dataset(self, *arg, **kwargs):
    // To return the validation dataset.
    ...
  @abstractmethod
  def build_signatures(self, *arg, **kwargs):
    // To build the signatures to export saved model.
    ...

En una clase concreta de DatasetBuilder , debe implementar build_train_datasets , build_valid_datasets y build_signatures .

También se proporciona una clase concreta que crea conjuntos de datos a partir de feature_spec s:

class BaseDatasetBuilder(AbstractDatasetBuilder):

  def __init__(self, context_feature_spec, example_feature_spec,
               training_only_example_spec,
               mask_feature_name, hparams,
               training_only_context_spec=None):
    // Specify label and weight specs in training_only_example_spec.
    ...
  def _features_and_labels(self, features):
    // To split the labels and weights from input features.
    ...

  def _build_dataset(self, ...):
    return tfr.data.build_ranking_dataset(
        context_feature_spec+training_only_context_spec,
        example_feature_spec+training_only_example_spec, mask_feature_name, ...)

  def build_train_dataset(self):
    return self._build_dataset(...)

  def build_valid_dataset(self):
    return self._build_dataset(...)

  def build_signatures(self, model):
    return saved_model.Signatures(model, context_feature_spec,
                                  example_feature_spec, mask_feature_name)()

Los hparams que se utilizan en DatasetBuilder se especifican en la clase de tfr.keras.pipeline.DatasetHparams .

Tubería

Ranking Pipeline se basa en la clase tfr.keras.pipeline.AbstractPipeline :

class AbstractPipeline:

  @abstractmethod
  def build_loss(self):
    // Returns a tf.keras.losses.Loss or a dict of Loss. To be overridden.
    ...
  @abstractmethod
  def build_metrics(self):
    // Returns a list of evaluation metrics. To be overridden.
    ...
  @abstractmethod
  def build_weighted_metrics(self):
    // Returns a list of weighted metrics. To be overridden.
    ...
  @abstractmethod
  def train_and_validate(self, *arg, **kwargs):
    // Main function to run the training pipeline. To be overridden.
    ...

También se proporciona una clase de canalización concreta que entrena el modelo con diferentes tf.distribute.strategy compatibles con model.fit :

class ModelFitPipeline(AbstractPipeline):

  def __init__(self, model_builder, dataset_builder, hparams):
    ...
  def build_callbacks(self):
    // Builds callbacks used in model.fit. Override for customized usage.
    ...
  def export_saved_model(self, model, export_to, checkpoint=None):
    if checkpoint:
      model.load_weights(checkpoint)
    model.save(export_to, signatures=dataset_builder.build_signatures(model))

  def train_and_validate(self, verbose=0):
    with self._strategy.scope():
      model = model_builder.build()
      model.compile(
          optimizer,
          loss=self.build_loss(),
          metrics=self.build_metrics(),
          loss_weights=self.hparams.loss_weights,
          weighted_metrics=self.build_weighted_metrics())
      train_dataset, valid_dataset = (
          dataset_builder.build_train_dataset(),
          dataset_builder.build_valid_dataset())
      model.fit(
          x=train_dataset,
          validation_data=valid_dataset,
          callbacks=self.build_callbacks(),
          verbose=verbose)
      self.export_saved_model(model, export_to=model_output_dir)

Los hparams utilizados en tfr.keras.pipeline.ModelFitPipeline se especifican en la clase de tfr.keras.pipeline.PipelineHparams . Esta clase ModelFitPipeline es suficiente para la mayoría de los casos de uso de TF Ranking. Los clientes pueden subclasificarlo fácilmente para propósitos específicos.

Soporte de estrategia distribuida

Consulte la capacitación distribuida para obtener una introducción detallada de las estrategias distribuidas compatibles con TensorFlow. Actualmente, la canalización de TensorFlow Ranking es compatible con tf.distribute.MirroredStrategy (predeterminado), tf.distribute.TPUStrategy , tf.distribute.MultiWorkerMirroredStrategy y tf.distribute.ParameterServerStrategy . La estrategia duplicada es compatible con la mayoría de los sistemas de una sola máquina. Establezca la strategy en None para ninguna estrategia distribuida.

En general, MirroredStrategy funciona para modelos relativamente pequeños en la mayoría de los dispositivos con opciones de CPU y GPU. MultiWorkerMirroredStrategy funciona para modelos grandes que no caben en un trabajador. ParameterServerStrategy realiza un entrenamiento asíncrono y requiere varios trabajadores disponibles. TPUStrategy es ideal para grandes modelos y big data cuando hay TPU disponibles; sin embargo, es menos flexible en cuanto a las formas de tensor que puede manejar.

preguntas frecuentes

  1. El conjunto mínimo de componentes para usar RankingPipeline
    Vea el código de ejemplo anterior.

  2. ¿Y si tengo mi propio model Keras?
    Para ser entrenado con estrategias tf.distribute , el model debe construirse con todas las variables entrenables definidas en la estrategia.scope(). Así que envuelva su modelo en ModelBuilder como,

class MyModelBuilder(AbstractModelBuilder):
  def __init__(self, model, context_feature_names, example_feature_names,
               mask_feature_name, name):
    super().__init__(mask_feature_name, name)
    self._model = model
    self._context_feature_names = context_feature_names
    self._example_feature_names = example_feature_names

  def create_inputs(self):
    inputs = self._model.input
    context_inputs = {inputs[name] for name in self._context_feature_names}
    example_inputs = {inputs[name] for name in self._example_feature_names}
    mask = inputs[self._mask_feature_name]
    return context_inputs, example_inputs, mask

  def preprocess(self, context_inputs, example_inputs, mask):
    return context_inputs, example_inputs, mask

  def score(self, context_features, example_features, mask):
    inputs = dict(
        list(context_features.items()) + list(example_features.items()) +
        [(self._mask_feature_name, mask)])
    return self._model(inputs)

model_builder = MyModelBuilder(model, context_feature_names, example_feature_names,
                               mask_feature_name, "my_model")

Luego, ingrese este model_builder a la canalización para obtener más capacitación.

,

TL;DR : Reduzca el código repetitivo para crear, entrenar y servir modelos de TensorFlow Ranking con TensorFlow Ranking Pipelines; Use estrategias distribuidas adecuadas para aplicaciones de clasificación a gran escala dado el caso de uso y los recursos.

Introducción

TensorFlow Ranking Pipeline consta de una serie de procesos de procesamiento de datos, construcción de modelos, capacitación y servicio que le permite construir, entrenar y brindar modelos de clasificación escalables basados ​​en redes neuronales a partir de registros de datos con un esfuerzo mínimo. La canalización es más eficiente cuando el sistema se amplía. En general, si su modelo tarda 10 minutos o más en ejecutarse en una sola máquina, considere usar este marco de canalización para distribuir la carga y acelerar el procesamiento.

TensorFlow Ranking Pipeline se ha ejecutado de manera constante y estable en experimentos y producciones a gran escala con big data (terabytes+) y modelos grandes (100M+ de FLOP) en sistemas distribuidos (1K+ CPU y 100+ GPU y TPU). Una vez que se prueba un modelo de TensorFlow con model.fit en una pequeña parte de los datos, se recomienda la canalización para el escaneo de hiperparámetros, el entrenamiento continuo y otras situaciones a gran escala.

Tubería de clasificación

En TensorFlow, una tubería típica para construir, entrenar y servir un modelo de clasificación incluye los siguientes pasos típicos.

  • Defina la estructura del modelo:
    • Crear entradas;
    • Crear capas de preprocesamiento;
    • Crear arquitectura de red neuronal;
  • Modelo de tren:
    • Genere conjuntos de datos de entrenamiento y validación a partir de registros de datos;
    • Prepare el modelo con los hiperparámetros adecuados:
      • Optimizador;
      • Pérdidas de clasificación;
      • métricas de clasificación;
    • Configure estrategias distribuidas para entrenar en múltiples dispositivos.
    • Configure las devoluciones de llamada para varios libros de contabilidad.
    • Modelo de exportación para servir;
  • Modelo de servicio:
    • Determinar el formato de datos al momento de servir;
    • Elija y cargue el modelo entrenado;
    • Proceso con modelo cargado.

Uno de los principales objetivos de la canalización TensorFlow Ranking es reducir el código repetitivo en los pasos, como la carga y el preprocesamiento de conjuntos de datos, la compatibilidad de los datos por lista y la función de puntuación por puntos, y la exportación de modelos. El otro objetivo importante es hacer cumplir el diseño consistente de muchos procesos inherentemente correlacionados, por ejemplo, las entradas del modelo deben ser compatibles tanto con los conjuntos de datos de entrenamiento como con el formato de los datos en servicio.

Guía de uso

Con todo el diseño anterior, el lanzamiento de un modelo de clasificación TF se divide en los siguientes pasos, como se muestra en la Figura 1.

Diagrama de canal de clasificación de TensorFlow
Figura 1 : diagrama de las clases de clasificación de TensorFlow y los pasos para entrenar modelos de clasificación con la canalización de clasificación de TF. Los módulos verdes se pueden personalizar para su modelo de clasificación.

Ejemplo usando una red neuronal distribuida

En este ejemplo, aprovechará los tfr.keras.model.FeatureSpecInputCreator , tfr.keras.pipeline.SimpleDatasetBuilder y tfr.keras.pipeline.SimplePipeline que toman en feature_spec s para definir consistentemente las características de entrada en las entradas del modelo y servidor de conjuntos de datos. La versión de cuaderno con un tutorial paso a paso se puede encontrar en el tutorial de clasificación distribuida .

Primero defina feature_spec s para las características de contexto y de ejemplo.

context_feature_spec = {}
example_feature_spec = {
    'custom_features_{}'.format(i + 1):
    tf.io.FixedLenFeature(shape=(1,), dtype=tf.float32, default_value=0.0)
    for i in range(10)
}
label_spec = ('utility', tf.io.FixedLenFeature(
    shape=(1,), dtype=tf.float32, default_value=-1))

Siga los pasos ilustrados en la Figura 1:
Defina input_creator a partir de feature_spec s.

input_creator = tfr.keras.model.FeatureSpecInputCreator(
    context_feature_spec, example_feature_spec)

Luego defina transformaciones de características de preprocesamiento para el mismo conjunto de características de entrada.

def log1p(tensor):
    return tf.math.log1p(tensor * tf.sign(tensor)) * tf.sign(tensor)
preprocessor = {
    'custom_features_{}'.format(i + 1): log1p
    for i in range(10)
}

Defina el anotador con el modelo DNN feedforward incorporado.

dnn_scorer = tfr.keras.model.DNNScorer(
    hidden_layer_dims=[1024, 512, 256],
    output_units=1,
    activation=tf.nn.relu,
    use_batch_norm=True,
    batch_norm_moment=0.99,
    dropout=0.4)

Cree el model_builder con input_creator , preprocessor y scorer .

model_builder = tfr.keras.model.ModelBuilder(
    input_creator=input_creator,
    preprocessor=preprocessor,
    scorer=dnn_scorer,
    mask_feature_name='__list_mask__',
    name='web30k_dnn_model')

Ahora configure los hiperparámetros para dataset_builder .

dataset_hparams = tfr.keras.pipeline.DatasetHparams(
    train_input_pattern='/path/to/MSLR-WEB30K-ELWC/train-*',
    valid_input_pattern='/path/to/MSLR-WEB30K-ELWC/vali-*',
    train_batch_size=128,
    valid_batch_size=128,
    list_size=200,
    dataset_reader=tf.data.RecordIODataset,
    convert_labels_to_binary=False)

Haz el dataset_builder .

tfr.keras.pipeline.SimpleDatasetBuilder(
    context_feature_spec=context_feature_spec,
    example_feature_spec=example_feature_spec,
    mask_feature_name='__list_mask__',
    label_spec=label_spec,
    hparams=dataset_hparams)

También establezca los hiperparámetros para la canalización.

pipeline_hparams = tfr.keras.pipeline.PipelineHparams(
    model_dir='/tmp/web30k_dnn_model',
    num_epochs=100,
    num_train_steps=100000,
    num_valid_steps=100,
    loss='softmax_loss',
    loss_reduction=tf.losses.Reduction.AUTO,
    optimizer='adam',
    learning_rate=0.0001,
    steps_per_execution=100,
    export_best_model=True,
    strategy='MirroredStrategy',
    tpu=None)

Haz el ranking_pipeline y entrena.

ranking_pipeline = tfr.keras.pipeline.SimplePipeline(
    model_builder=model_builder,
    dataset_builder=dataset_builder,
    hparams=pipeline_hparams,
)
ranking_pipeline.train_and_validate()

Diseño de canalización de clasificación de TensorFlow

TensorFlow Ranking Pipeline ayuda a ahorrar tiempo de ingeniería con el código repetitivo y, al mismo tiempo, permite la flexibilidad de personalización a través de la anulación y la subclasificación. Para lograr esto, la canalización presenta clases personalizables tfr.keras.model.AbstractModelBuilder , tfr.keras.pipeline.AbstractDatasetBuilder y tfr.keras.pipeline.AbstractPipeline para configurar la canalización TensorFlow Ranking.

Diseño de clases de canalización de clasificación de TensorFlow
Figura 2 : diseño general de las clases de canalización de clasificación de TensorFlow.

ModelBuilder

El código repetitivo relacionado con la construcción del modelo de Keras está integrado en AbstractModelBuilder , que se pasa a AbstractPipeline y se llama dentro de la tubería para construir el modelo bajo el alcance de la estrategia. Esto se muestra en la Figura 1. Los métodos de clase se definen en la clase base abstracta.

class AbstractModelBuilder:
  def __init__(self, mask_feature_name, name):

  @abstractmethod
  def create_inputs(self):
    // To create tf.keras.Input. Abstract method, to be overridden.
    ...
  @abstractmethod
  def preprocess(self, context_inputs, example_inputs, mask):
    // To preprocess input features. Abstract method, to be overridden.
    ...
  @abstractmethod
  def score(self, context_features, example_features, mask):
    // To score based on preprocessed features. Abstract method, to be overridden.
    ...
  def build(self):
    context_inputs, example_inputs, mask = self.create_inputs()
    context_features, example_features = self.preprocess(
        context_inputs, example_inputs, mask)
    logits = self.score(context_features, example_features, mask)
    return tf.keras.Model(inputs=..., outputs=logits, name=self._name)

Puede subclasificar directamente AbstractModelBuilder y sobrescribir con los métodos concretos para la personalización, como

class MyModelBuilder(AbstractModelBuilder):
  def create_inputs(self, ...):
  ...

Al mismo tiempo, debe usar ModelBuilder con características de entrada, transformaciones de preprocesamiento y funciones de puntuación especificadas como entradas de función input_creator , preprocessor y scorer en la clase init en lugar de subclases.

class ModelBuilder(AbstractModelBuilder):
  def __init__(self, input_creator, preprocessor, scorer, mask_feature_name, name):
  ...

Para reducir los repetitivos de la creación de estas entradas, se proporcionan las clases de funciones tfr.keras.model.InputCreator para input_creator , tfr.keras.model.Preprocessor para preprocessor y tfr.keras.model.Scorer para scorer , junto con subclases concretas tfr.keras.model.FeatureSpecInputCreator , tfr.keras.model.TypeSpecInputCreator , tfr.keras.model.PreprocessorWithSpec , tfr.keras.model.UnivariateScorer , tfr.keras.model.DNNScorer y tfr.keras.model.GAMScorer . Estos deberían cubrir la mayoría de los casos de uso comunes.

Tenga en cuenta que estas clases de funciones son clases de Keras, por lo que no es necesaria la serialización. La creación de subclases es la forma recomendada de personalizarlos.

Generador de conjuntos de datos

La clase DatasetBuilder recopila repeticiones relacionadas con conjuntos de datos. Los datos se pasan a Pipeline y se llaman para servir los conjuntos de datos de entrenamiento y validación y para definir las firmas de servicio para los modelos guardados. Como se muestra en la Figura 1, los métodos de DatasetBuilder se definen en la clase base tfr.keras.pipeline.AbstractDatasetBuilder ,

class AbstractDatasetBuilder:

  @abstractmethod
  def build_train_dataset(self, *arg, **kwargs):
    // To return the training dataset.
    ...
  @abstractmethod
  def build_valid_dataset(self, *arg, **kwargs):
    // To return the validation dataset.
    ...
  @abstractmethod
  def build_signatures(self, *arg, **kwargs):
    // To build the signatures to export saved model.
    ...

En una clase concreta de DatasetBuilder , debe implementar build_train_datasets , build_valid_datasets y build_signatures .

También se proporciona una clase concreta que crea conjuntos de datos a partir de feature_spec s:

class BaseDatasetBuilder(AbstractDatasetBuilder):

  def __init__(self, context_feature_spec, example_feature_spec,
               training_only_example_spec,
               mask_feature_name, hparams,
               training_only_context_spec=None):
    // Specify label and weight specs in training_only_example_spec.
    ...
  def _features_and_labels(self, features):
    // To split the labels and weights from input features.
    ...

  def _build_dataset(self, ...):
    return tfr.data.build_ranking_dataset(
        context_feature_spec+training_only_context_spec,
        example_feature_spec+training_only_example_spec, mask_feature_name, ...)

  def build_train_dataset(self):
    return self._build_dataset(...)

  def build_valid_dataset(self):
    return self._build_dataset(...)

  def build_signatures(self, model):
    return saved_model.Signatures(model, context_feature_spec,
                                  example_feature_spec, mask_feature_name)()

Los hparams que se utilizan en DatasetBuilder se especifican en la clase de tfr.keras.pipeline.DatasetHparams .

Tubería

Ranking Pipeline se basa en la clase tfr.keras.pipeline.AbstractPipeline :

class AbstractPipeline:

  @abstractmethod
  def build_loss(self):
    // Returns a tf.keras.losses.Loss or a dict of Loss. To be overridden.
    ...
  @abstractmethod
  def build_metrics(self):
    // Returns a list of evaluation metrics. To be overridden.
    ...
  @abstractmethod
  def build_weighted_metrics(self):
    // Returns a list of weighted metrics. To be overridden.
    ...
  @abstractmethod
  def train_and_validate(self, *arg, **kwargs):
    // Main function to run the training pipeline. To be overridden.
    ...

También se proporciona una clase de canalización concreta que entrena el modelo con diferentes tf.distribute.strategy compatibles con model.fit :

class ModelFitPipeline(AbstractPipeline):

  def __init__(self, model_builder, dataset_builder, hparams):
    ...
  def build_callbacks(self):
    // Builds callbacks used in model.fit. Override for customized usage.
    ...
  def export_saved_model(self, model, export_to, checkpoint=None):
    if checkpoint:
      model.load_weights(checkpoint)
    model.save(export_to, signatures=dataset_builder.build_signatures(model))

  def train_and_validate(self, verbose=0):
    with self._strategy.scope():
      model = model_builder.build()
      model.compile(
          optimizer,
          loss=self.build_loss(),
          metrics=self.build_metrics(),
          loss_weights=self.hparams.loss_weights,
          weighted_metrics=self.build_weighted_metrics())
      train_dataset, valid_dataset = (
          dataset_builder.build_train_dataset(),
          dataset_builder.build_valid_dataset())
      model.fit(
          x=train_dataset,
          validation_data=valid_dataset,
          callbacks=self.build_callbacks(),
          verbose=verbose)
      self.export_saved_model(model, export_to=model_output_dir)

Los hparams utilizados en tfr.keras.pipeline.ModelFitPipeline se especifican en la clase de tfr.keras.pipeline.PipelineHparams . Esta clase ModelFitPipeline es suficiente para la mayoría de los casos de uso de TF Ranking. Los clientes pueden subclasificarlo fácilmente para propósitos específicos.

Soporte de estrategia distribuida

Consulte la capacitación distribuida para obtener una introducción detallada de las estrategias distribuidas compatibles con TensorFlow. Actualmente, la canalización de TensorFlow Ranking es compatible con tf.distribute.MirroredStrategy (predeterminado), tf.distribute.TPUStrategy , tf.distribute.MultiWorkerMirroredStrategy y tf.distribute.ParameterServerStrategy . La estrategia duplicada es compatible con la mayoría de los sistemas de una sola máquina. Establezca la strategy en None para ninguna estrategia distribuida.

En general, MirroredStrategy funciona para modelos relativamente pequeños en la mayoría de los dispositivos con opciones de CPU y GPU. MultiWorkerMirroredStrategy funciona para modelos grandes que no caben en un trabajador. ParameterServerStrategy realiza un entrenamiento asíncrono y requiere varios trabajadores disponibles. TPUStrategy es ideal para grandes modelos y big data cuando hay TPU disponibles; sin embargo, es menos flexible en cuanto a las formas de tensor que puede manejar.

preguntas frecuentes

  1. El conjunto mínimo de componentes para usar RankingPipeline
    Vea el código de ejemplo anterior.

  2. ¿Y si tengo mi propio model Keras?
    Para ser entrenado con estrategias tf.distribute , el model debe construirse con todas las variables entrenables definidas en la estrategia.scope(). Así que envuelva su modelo en ModelBuilder como,

class MyModelBuilder(AbstractModelBuilder):
  def __init__(self, model, context_feature_names, example_feature_names,
               mask_feature_name, name):
    super().__init__(mask_feature_name, name)
    self._model = model
    self._context_feature_names = context_feature_names
    self._example_feature_names = example_feature_names

  def create_inputs(self):
    inputs = self._model.input
    context_inputs = {inputs[name] for name in self._context_feature_names}
    example_inputs = {inputs[name] for name in self._example_feature_names}
    mask = inputs[self._mask_feature_name]
    return context_inputs, example_inputs, mask

  def preprocess(self, context_inputs, example_inputs, mask):
    return context_inputs, example_inputs, mask

  def score(self, context_features, example_features, mask):
    inputs = dict(
        list(context_features.items()) + list(example_features.items()) +
        [(self._mask_feature_name, mask)])
    return self._model(inputs)

model_builder = MyModelBuilder(model, context_feature_names, example_feature_names,
                               mask_feature_name, "my_model")

Luego, ingrese este model_builder a la canalización para obtener más capacitación.