Introduzione alla pipeline di classificazione di TensorFlow

TL;DR : ridurre il codice boilerplate per creare, addestrare e servire modelli TensorFlow Ranking con pipeline TensorFlow Ranking; Utilizzare strategie distribuite adeguate per applicazioni di classificazione su larga scala in base al caso d'uso e alle risorse.

Introduzione

TensorFlow Ranking Pipeline è costituito da una serie di processi di elaborazione dati, creazione di modelli, training e servizi che ti consentono di costruire, addestrare e servire modelli di ranking scalabili basati su rete neurale dai log di dati con sforzi minimi. La pipeline è più efficiente quando il sistema si espande. In generale, se l'esecuzione del tuo modello su una singola macchina richiede 10 minuti o più, prendi in considerazione l'utilizzo di questo framework di pipeline per distribuire il carico e accelerare l'elaborazione.

La TensorFlow Ranking Pipeline è stata costantemente e stabilmente utilizzata in esperimenti e produzioni su larga scala con big data (terabyte+) e grandi modelli (100 milioni+ di FLOP) su sistemi distribuiti (1K+ CPU e 100+ GPU e TPU). Una volta testato un modello TensorFlow con model.fit su una piccola parte dei dati, la pipeline è consigliata per la scansione degli iperparametri, la formazione continua e altre situazioni su larga scala.

Pipeline di classifica

In TensorFlow, una tipica pipeline per costruire, addestrare e servire un modello di ranking include i seguenti passaggi tipici.

  • Definire la struttura del modello:
    • Creare input;
    • Creare livelli di pre-elaborazione;
    • Creare un'architettura di rete neurale;
  • Modello del treno:
    • Generare set di dati di training e convalida dai log di dati;
    • Preparare il modello con gli iperparametri adeguati:
      • Ottimizzatore;
      • Perdite in classifica;
      • Metriche di posizionamento;
    • Configura strategie distribuite per l'addestramento su più dispositivi.
    • Configurare richiamate per varie contabilità.
    • Modello di esportazione per servire;
  • Servire il modello:
    • Determinare il formato dei dati al momento del servizio;
    • Scegli e carica il modello addestrato;
    • Processo con modello caricato.

Uno degli obiettivi principali della pipeline TensorFlow Ranking è ridurre il codice boilerplate nei passaggi, come il caricamento e la preelaborazione del set di dati, la compatibilità dei dati listwise e della funzione di punteggio pointwise e l'esportazione del modello. L'altro obiettivo importante è quello di applicare la progettazione coerente di molti processi intrinsecamente correlati, ad esempio, gli input del modello devono essere compatibili sia con i set di dati di addestramento che con il formato dei dati al momento del servizio.

Guida all'uso

Con tutto il progetto di cui sopra, il lancio di un modello di classificazione TF prevede i seguenti passaggi, come mostrato nella Figura 1.

Diagramma della pipeline di classificazione di TensorFlow
Figura 1 : diagramma delle classi TensorFlow Ranking e passaggi per addestrare i modelli di classificazione con la pipeline TF Ranking. I moduli verdi possono essere personalizzati per il tuo modello di ranking.

Esempio utilizzando una rete neurale distribuita

In questo esempio, utilizzerai tfr.keras.model.FeatureSpecInputCreator , tfr.keras.pipeline.SimpleDatasetBuilder integrati e tfr.keras.pipeline.SimplePipeline che accettano feature_spec per definire in modo coerente le funzionalità di input negli input del modello e server del set di dati. La versione notebook con una procedura dettagliata può essere trovata nel tutorial sulla classificazione distribuita .

Per prima cosa definisci feature_spec sia per le funzionalità di contesto che per quelle di esempio.

context_feature_spec = {}
example_feature_spec = {
    'custom_features_{}'.format(i + 1):
    tf.io.FixedLenFeature(shape=(1,), dtype=tf.float32, default_value=0.0)
    for i in range(10)
}
label_spec = ('utility', tf.io.FixedLenFeature(
    shape=(1,), dtype=tf.float32, default_value=-1))

Seguire i passaggi illustrati nella Figura 1:
Definire input_creator da feature_spec s.

input_creator = tfr.keras.model.FeatureSpecInputCreator(
    context_feature_spec, example_feature_spec)

Quindi definire le trasformazioni delle funzionalità di preelaborazione per lo stesso set di funzionalità di input.

def log1p(tensor):
    return tf.math.log1p(tensor * tf.sign(tensor)) * tf.sign(tensor)
preprocessor = {
    'custom_features_{}'.format(i + 1): log1p
    for i in range(10)
}

Definisci il punteggio con il modello DNN feedforward integrato.

dnn_scorer = tfr.keras.model.DNNScorer(
    hidden_layer_dims=[1024, 512, 256],
    output_units=1,
    activation=tf.nn.relu,
    use_batch_norm=True,
    batch_norm_moment=0.99,
    dropout=0.4)

Crea model_builder con input_creator , preprocessor e scorer .

model_builder = tfr.keras.model.ModelBuilder(
    input_creator=input_creator,
    preprocessor=preprocessor,
    scorer=dnn_scorer,
    mask_feature_name='__list_mask__',
    name='web30k_dnn_model')

Ora imposta gli iperparametri per dataset_builder .

dataset_hparams = tfr.keras.pipeline.DatasetHparams(
    train_input_pattern='/path/to/MSLR-WEB30K-ELWC/train-*',
    valid_input_pattern='/path/to/MSLR-WEB30K-ELWC/vali-*',
    train_batch_size=128,
    valid_batch_size=128,
    list_size=200,
    dataset_reader=tf.data.RecordIODataset,
    convert_labels_to_binary=False)

Crea il dataset_builder .

tfr.keras.pipeline.SimpleDatasetBuilder(
    context_feature_spec=context_feature_spec,
    example_feature_spec=example_feature_spec,
    mask_feature_name='__list_mask__',
    label_spec=label_spec,
    hparams=dataset_hparams)

Imposta anche gli iperparametri per la pipeline.

pipeline_hparams = tfr.keras.pipeline.PipelineHparams(
    model_dir='/tmp/web30k_dnn_model',
    num_epochs=100,
    num_train_steps=100000,
    num_valid_steps=100,
    loss='softmax_loss',
    loss_reduction=tf.losses.Reduction.AUTO,
    optimizer='adam',
    learning_rate=0.0001,
    steps_per_execution=100,
    export_best_model=True,
    strategy='MirroredStrategy',
    tpu=None)

Crea ranking_pipeline e allenati.

ranking_pipeline = tfr.keras.pipeline.SimplePipeline(
    model_builder=model_builder,
    dataset_builder=dataset_builder,
    hparams=pipeline_hparams,
)
ranking_pipeline.train_and_validate()

Progettazione della pipeline di classificazione TensorFlow

La pipeline di classificazione TensorFlow aiuta a risparmiare tempo di progettazione con il codice boilerplate e, allo stesso tempo, consente flessibilità di personalizzazione tramite override e sottoclassi. Per raggiungere questo obiettivo, la pipeline introduce classi personalizzabili tfr.keras.model.AbstractModelBuilder , tfr.keras.pipeline.AbstractDatasetBuilder e tfr.keras.pipeline.AbstractPipeline per impostare la pipeline TensorFlow Ranking.

Progettazione delle classi Pipeline di TensorFlow Ranking
Figura 2 : progettazione generale delle classi della pipeline TensorFlow Ranking.

Costruttore di modelli

Il codice boilerplate relativo alla costruzione del modello Keras è integrato in AbstractModelBuilder , che viene passato a AbstractPipeline e chiamato all'interno della pipeline per costruire il modello nell'ambito della strategia. Ciò è mostrato nella Figura 1. I metodi di classe sono definiti nella classe base astratta.

class AbstractModelBuilder:
  def __init__(self, mask_feature_name, name):

  @abstractmethod
  def create_inputs(self):
    // To create tf.keras.Input. Abstract method, to be overridden.
    ...
  @abstractmethod
  def preprocess(self, context_inputs, example_inputs, mask):
    // To preprocess input features. Abstract method, to be overridden.
    ...
  @abstractmethod
  def score(self, context_features, example_features, mask):
    // To score based on preprocessed features. Abstract method, to be overridden.
    ...
  def build(self):
    context_inputs, example_inputs, mask = self.create_inputs()
    context_features, example_features = self.preprocess(
        context_inputs, example_inputs, mask)
    logits = self.score(context_features, example_features, mask)
    return tf.keras.Model(inputs=..., outputs=logits, name=self._name)

Puoi direttamente sottoclassare AbstractModelBuilder e sovrascrivere con i metodi concreti per la personalizzazione, come

class MyModelBuilder(AbstractModelBuilder):
  def create_inputs(self, ...):
  ...

Allo stesso tempo, dovresti utilizzare ModelBuilder con funzionalità di input, trasformazioni di preelaborazione e funzioni di punteggio specificate come input di funzione input_creator , preprocessor e scorer nella classe init anziché in sottoclassi.

class ModelBuilder(AbstractModelBuilder):
  def __init__(self, input_creator, preprocessor, scorer, mask_feature_name, name):
  ...

Per ridurre i tempi di creazione di questi input, vengono fornite le classi di funzione tfr.keras.model.InputCreator per input_creator , tfr.keras.model.Preprocessor per preprocessor e tfr.keras.model.Scorer per scorer , insieme alle sottoclassi concrete tfr.keras.model.FeatureSpecInputCreator , tfr.keras.model.TypeSpecInputCreator , tfr.keras.model.PreprocessorWithSpec , tfr.keras.model.UnivariateScorer , tfr.keras.model.DNNScorer e tfr.keras.model.GAMScorer . Questi dovrebbero coprire la maggior parte dei casi d'uso comuni.

Tieni presente che queste classi di funzioni sono classi Keras, quindi non è necessaria la serializzazione. La creazione di sottoclassi è il modo consigliato per personalizzarli.

DatasetBuilder

La classe DatasetBuilder raccoglie il boilerplate relativo al set di dati. I dati vengono passati alla Pipeline e chiamati a servire i set di dati di addestramento e convalida e a definire le firme di servizio per i modelli salvati. Come mostrato nella Figura 1, i metodi DatasetBuilder sono definiti nella classe base tfr.keras.pipeline.AbstractDatasetBuilder ,

class AbstractDatasetBuilder:

  @abstractmethod
  def build_train_dataset(self, *arg, **kwargs):
    // To return the training dataset.
    ...
  @abstractmethod
  def build_valid_dataset(self, *arg, **kwargs):
    // To return the validation dataset.
    ...
  @abstractmethod
  def build_signatures(self, *arg, **kwargs):
    // To build the signatures to export saved model.
    ...

In una classe DatasetBuilder concreta, è necessario implementare build_train_datasets , build_valid_datasets e build_signatures .

Viene inoltre fornita una classe concreta che crea set di dati da feature_spec :

class BaseDatasetBuilder(AbstractDatasetBuilder):

  def __init__(self, context_feature_spec, example_feature_spec,
               training_only_example_spec,
               mask_feature_name, hparams,
               training_only_context_spec=None):
    // Specify label and weight specs in training_only_example_spec.
    ...
  def _features_and_labels(self, features):
    // To split the labels and weights from input features.
    ...

  def _build_dataset(self, ...):
    return tfr.data.build_ranking_dataset(
        context_feature_spec+training_only_context_spec,
        example_feature_spec+training_only_example_spec, mask_feature_name, ...)

  def build_train_dataset(self):
    return self._build_dataset(...)

  def build_valid_dataset(self):
    return self._build_dataset(...)

  def build_signatures(self, model):
    return saved_model.Signatures(model, context_feature_spec,
                                  example_feature_spec, mask_feature_name)()

Gli hparams utilizzati nel DatasetBuilder sono specificati nella classe dati tfr.keras.pipeline.DatasetHparams .

Conduttura

La Ranking Pipeline si basa sulla classe tfr.keras.pipeline.AbstractPipeline :

class AbstractPipeline:

  @abstractmethod
  def build_loss(self):
    // Returns a tf.keras.losses.Loss or a dict of Loss. To be overridden.
    ...
  @abstractmethod
  def build_metrics(self):
    // Returns a list of evaluation metrics. To be overridden.
    ...
  @abstractmethod
  def build_weighted_metrics(self):
    // Returns a list of weighted metrics. To be overridden.
    ...
  @abstractmethod
  def train_and_validate(self, *arg, **kwargs):
    // Main function to run the training pipeline. To be overridden.
    ...

Viene inoltre fornita una classe pipeline concreta che addestra il modello con diversi tf.distribute.strategy compatibili con model.fit :

class ModelFitPipeline(AbstractPipeline):

  def __init__(self, model_builder, dataset_builder, hparams):
    ...
  def build_callbacks(self):
    // Builds callbacks used in model.fit. Override for customized usage.
    ...
  def export_saved_model(self, model, export_to, checkpoint=None):
    if checkpoint:
      model.load_weights(checkpoint)
    model.save(export_to, signatures=dataset_builder.build_signatures(model))

  def train_and_validate(self, verbose=0):
    with self._strategy.scope():
      model = model_builder.build()
      model.compile(
          optimizer,
          loss=self.build_loss(),
          metrics=self.build_metrics(),
          loss_weights=self.hparams.loss_weights,
          weighted_metrics=self.build_weighted_metrics())
      train_dataset, valid_dataset = (
          dataset_builder.build_train_dataset(),
          dataset_builder.build_valid_dataset())
      model.fit(
          x=train_dataset,
          validation_data=valid_dataset,
          callbacks=self.build_callbacks(),
          verbose=verbose)
      self.export_saved_model(model, export_to=model_output_dir)

Gli hparams utilizzati in tfr.keras.pipeline.ModelFitPipeline sono specificati nella classe dati tfr.keras.pipeline.PipelineHparams . Questa classe ModelFitPipeline è sufficiente per la maggior parte dei casi d'uso TF Ranking. I client possono facilmente sottoclassarlo per scopi specifici.

Supporto alla strategia distribuita

Fare riferimento alla formazione distribuita per un'introduzione dettagliata delle strategie distribuite supportate da TensorFlow. Attualmente, la pipeline TensorFlow Ranking supporta tf.distribute.MirroredStrategy (impostazione predefinita), tf.distribute.TPUStrategy , tf.distribute.MultiWorkerMirroredStrategy e tf.distribute.ParameterServerStrategy . La strategia speculare è compatibile con la maggior parte dei sistemi a macchina singola. Imposta strategy su None per nessuna strategia distribuita.

In generale, MirroredStrategy funziona per modelli relativamente piccoli sulla maggior parte dei dispositivi con opzioni CPU e GPU. MultiWorkerMirroredStrategy funziona per modelli di grandi dimensioni che non rientrano in un lavoratore. ParameterServerStrategy esegue la formazione asincrona e richiede la disponibilità di più lavoratori. TPUStrategy è ideale per modelli e big data di grandi dimensioni quando sono disponibili le TPU, tuttavia è meno flessibile in termini di forme tensoriali che può gestire.

Domande frequenti

  1. Il set minimo di componenti per l'utilizzo di RankingPipeline
    Vedi il codice di esempio sopra.

  2. E se avessi il mio model Keras?
    Per essere addestrato con le strategie tf.distribute , model deve essere costruito con tutte le variabili addestrabili definite in strategic.scope(). Quindi avvolgi il tuo modello in ModelBuilder come,

class MyModelBuilder(AbstractModelBuilder):
  def __init__(self, model, context_feature_names, example_feature_names,
               mask_feature_name, name):
    super().__init__(mask_feature_name, name)
    self._model = model
    self._context_feature_names = context_feature_names
    self._example_feature_names = example_feature_names

  def create_inputs(self):
    inputs = self._model.input
    context_inputs = {inputs[name] for name in self._context_feature_names}
    example_inputs = {inputs[name] for name in self._example_feature_names}
    mask = inputs[self._mask_feature_name]
    return context_inputs, example_inputs, mask

  def preprocess(self, context_inputs, example_inputs, mask):
    return context_inputs, example_inputs, mask

  def score(self, context_features, example_features, mask):
    inputs = dict(
        list(context_features.items()) + list(example_features.items()) +
        [(self._mask_feature_name, mask)])
    return self._model(inputs)

model_builder = MyModelBuilder(model, context_feature_names, example_feature_names,
                               mask_feature_name, "my_model")

Quindi inserisci questo model_builder nella pipeline per ulteriore formazione.