Addestramento distribuito

La formazione distribuita è un tipo di formazione modello in cui i requisiti delle risorse di calcolo (ad esempio, CPU, RAM) sono distribuiti tra più computer. La formazione distribuita consente di addestrarsi più velocemente e su set di dati più grandi (fino a qualche miliardo di esempi).

L'addestramento distribuito è utile anche per l'ottimizzazione automatizzata degli iperparametri in cui più modelli vengono addestrati in parallelo.

In questo documento imparerai come:

  • Addestra un modello TF-DF utilizzando l'addestramento distribuito.
  • Ottimizza gli iperparametri di un modello TF-DF utilizzando il training distribuito.

Limitazioni

A partire da ora, la formazione distribuita è supportata per:

  • Addestramento dei modelli Gradient Boosted Trees con tfdf.keras.DistributedGradientBoostedTreesModel . I modelli Distributed Gradient Boosted Trees sono equivalenti alle loro controparti non distribuite.
  • Ricerca iperparametrica per qualsiasi tipo di modello TF-DF.

Come abilitare la formazione distribuita

In questa sezione sono elencati i passaggi per abilitare la formazione distribuita. Per esempi completi, vedere la sezione successiva.

Ambito ParametroServerStrategy

Il modello e il set di dati sono definiti in un ambito ParameterServerStrategy .

strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()
  distributed_train_dataset = strategy.distribute_datasets_from_function(dataset_fn)
model.fit(distributed_train_dataset)

Formato del set di dati

Come per la formazione non distribuita, i set di dati possono essere forniti come

  1. Un set di dati distribuito tensorflow finito, o
  2. un percorso ai file del set di dati utilizzando uno dei formati del set di dati compatibili .

L'utilizzo di file partizionati è notevolmente più semplice rispetto all'utilizzo dell'approccio del set di dati distribuiti tensorflow finito (1 riga contro ~20 righe di codice). Tuttavia, solo l'approccio del set di dati tensorflow supporta la pre-elaborazione TensorFlow. Se la pipeline non contiene alcuna pre-elaborazione, è consigliata l'opzione del set di dati partizionato.

In entrambi i casi, il set di dati deve essere suddiviso in più file per distribuire la lettura del set di dati in modo efficiente.

Addetti all'installazione

Un processo principale è il programma che esegue il codice Python che definisce il modello TensorFlow. Questo processo non esegue calcoli pesanti. Il calcolo effettivo della formazione viene effettuato dai lavoratori . I lavoratori sono processi che eseguono un server di parametri TensorFlow.

Il capo dovrebbe essere configurato con l'indirizzo IP dei lavoratori. Questo può essere fatto utilizzando la variabile di ambiente TF_CONFIG o creando un ClusterResolver . Per ulteriori dettagli, consulta Formazione sul server dei parametri con ParametroServerStrategy .

ParametroServerStrategy di TensorFlow definisce due tipi di lavoratori: "lavoratori" e "server dei parametri". TensorFlow richiede che venga istanziato almeno un tipo di lavoratore per ciascun tipo. Tuttavia, TF-DF utilizza solo "lavoratori". Pertanto, è necessario istanziare un "server dei parametri" ma non verrà utilizzato da TF-DF. Ad esempio, la configurazione di un allenamento TF-DF potrebbe apparire come segue:

  • 1 capo
  • 50 lavoratori
  • 1 Server dei parametri

I lavoratori devono accedere alle operazioni di formazione personalizzate di TensorFlow Decision Forests. Sono disponibili due opzioni per abilitare l'accesso:

  1. Utilizzare il server dei parametri C++ TF-DF preconfigurato //third_party/tensorflow_decision_forests/tensorflow/distribute:tensorflow_std_server .
  2. Crea un server di parametri chiamando tf.distribute.Server() . In questo caso, TF-DF dovrebbe essere importato import tensorflow_decision_forests .

Esempi

Questa sezione mostra esempi completi di configurazioni di formazione distribuita. Per ulteriori esempi, controlla gli unit test TF-DF .

Esempio: formazione distribuita sul percorso del set di dati

Dividi il tuo set di dati in una serie di file partizionati utilizzando uno dei formati di set di dati compatibili . Si consiglia di denominare i file come segue: /path/to/dataset/train-<5 digit index>-of-<total files> , ad esempio

/path/to/dataset/train-00000-of-00100
/path/to/dataset/train-00001-of-00005
/path/to/dataset/train-00002-of-00005
...

Per la massima efficienza, il numero di file dovrebbe essere almeno 10 volte il numero di lavoratori. Ad esempio, se ti stai formando con 100 lavoratori, assicurati che il set di dati sia diviso in almeno 1000 file.

È quindi possibile fare riferimento ai file con un'espressione di sharding come:

  • /percorso/del/set di dati/treno@1000
  • /percorso/del/set di dati/treno@*

La formazione distribuita viene eseguita come segue. In questo esempio, il set di dati viene archiviato come TFRecord di TensorFlow Esempi (definito dalla chiave tfrecord+tfe ).

import tensorflow_decision_forests as tfdf
import tensorflow as tf

strategy = tf.distribute.experimental.ParameterServerStrategy(...)

with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

model.fit_on_dataset_path(
    train_path="/path/to/dataset/train@1000",
    label_key="label_key",
    dataset_format="tfrecord+tfe")

print("Trained model")
model.summary()

Esempio: addestramento distribuito su un set di dati distribuito TensorFlow finito

TF-DF prevede un set di dati TensorFlow distribuito e suddiviso in partizioni di lavoratori finiti:

  • Distribuito : un set di dati non distribuito è racchiuso in strategy.distribute_datasets_from_function .
  • finito : il set di dati dovrebbe leggere ogni esempio esattamente una volta. Il set di dati non deve contenere istruzioni repeat .
  • lavoratore-sharded : ogni lavoratore deve leggere una parte separata del set di dati.

Ecco un esempio:

import tensorflow_decision_forests as tfdf
import tensorflow as tf


def dataset_fn(context, paths):
  """Create a worker-sharded finite dataset from paths.

  Like for non-distributed training, each example should be visited exactly
  once (and by only one worker) during the training. In addition, for optimal
  training speed, the reading of the examples should be distributed among the
  workers (instead of being read by a single worker, or read and discarded
  multiple times).

  In other words, don't add a "repeat" statement and make sure to shard the
  dataset at the file level and not at the example level.
  """

  # List the dataset files
  ds_path = tf.data.Dataset.from_tensor_slices(paths)

  # Make sure the dataset is used with distributed training.
  assert context is not None


  # Split the among the workers.
  #
  # Note: The "shard" is applied on the file path. The shard should not be
  # applied on the examples directly.
  # Note: You cannot use 'context.num_input_pipelines' with ParameterServerV2.
  current_worker = tfdf.keras.get_worker_idx_and_num_workers(context)
  ds_path = ds_path.shard(
      num_shards=current_worker.num_workers,
      index=current_worker.worker_idx)

  def read_csv_file(path):
    """Reads a single csv file."""

    numerical = tf.constant([0.0], dtype=tf.float32)
    categorical_string = tf.constant(["NA"], dtype=tf.string)
    csv_columns = [
        numerical,  # feature 1
        categorical_string,  # feature 2
        numerical,  # feature 3
        # ... define the features here.
    ]
    return tf.data.experimental.CsvDataset(path, csv_columns, header=True)

  ds_columns = ds_path.interleave(read_csv_file)

  # We assume a binary classification label with the following possible values.
  label_values = ["<=50K", ">50K"]

  # Convert the text labels into integers:
  # "<=50K" => 0
  # ">50K" => 1
  init_label_table = tf.lookup.KeyValueTensorInitializer(
      keys=tf.constant(label_values),
      values=tf.constant(range(label_values), dtype=tf.int64))
  label_table = tf.lookup.StaticVocabularyTable(
      init_label_table, num_oov_buckets=1)

  def extract_label(*columns):
    return columns[0:-1], label_table.lookup(columns[-1])

  ds_dataset = ds_columns.map(extract_label)

  # The batch size has no impact on the quality of the model. However, a larger
  # batch size generally is faster.
  ds_dataset = ds_dataset.batch(500)
  return ds_dataset


strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

  train_dataset = strategy.distribute_datasets_from_function(
      lambda context: dataset_fn(context, [...list of csv files...])
  )

model.fit(train_dataset)

print("Trained model")
model.summary()

Esempio: ottimizzazione distribuita degli iperparametri su un percorso del set di dati

L'ottimizzazione distribuita degli iperparametri su un percorso del set di dati è simile all'addestramento distribuito. L'unica differenza è che questa opzione è compatibile con i modelli non distribuiti. Ad esempio, è possibile distribuire l'ottimizzazione degli iperparametri del modello Gradient Boosted Trees (non distribuito).

with strategy.scope():
  tuner = tfdf.tuner.RandomSearch(num_trials=30, use_predefined_hps=True)
  model = tfdf.keras.GradientBoostedTreesModel(tuner=tuner)

training_history = model.fit_on_dataset_path(
  train_path=train_path,
  label_key=label,
  dataset_format="csv",
  valid_path=test_path)

logging.info("Trained model:")
model.summary()

Esempio: test unitario

Per eseguire test unitari di formazione distribuita, è possibile creare processi di lavoro simulati. Per ulteriori informazioni, vedere il metodo _create_in_process_tf_ps_cluster negli unit test TF-DF .