Treinamento distribuído

O treinamento distribuído é um tipo de treinamento de modelo em que os requisitos de recursos computacionais (por exemplo, CPU, RAM) são distribuídos entre vários computadores. O treinamento distribuído permite treinar mais rápido e em conjuntos de dados maiores (até alguns bilhões de exemplos).

O treinamento distribuído também é útil para otimização automatizada de hiperparâmetros, onde vários modelos são treinados em paralelo.

Neste documento você aprenderá como:

  • Treine um modelo TF-DF usando treinamento distribuído.
  • Ajuste os hiperparâmetros de um modelo TF-DF usando treinamento distribuído.

Limitações

A partir de agora, o treinamento distribuído é compatível com:

  • Treinamento de modelos de Gradient Boosted Trees com tfdf.keras.DistributedGradientBoostedTreesModel . Os modelos Distributed Gradient Boosted Trees são equivalentes aos seus equivalentes não distribuídos.
  • Pesquisa de hiperparâmetros para qualquer tipo de modelo TF-DF.

Como habilitar o treinamento distribuído

Esta seção lista as etapas para habilitar o treinamento distribuído. Para exemplos completos, consulte a próxima seção.

Escopo ParameterServerStrategy

O modelo e o conjunto de dados são definidos em um escopo ParameterServerStrategy .

strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()
  distributed_train_dataset = strategy.distribute_datasets_from_function(dataset_fn)
model.fit(distributed_train_dataset)

Formato do conjunto de dados

Tal como acontece com o treinamento não distribuído, os conjuntos de dados podem ser fornecidos como

  1. Um conjunto de dados distribuído de tensorflow finito, ou
  2. um caminho para os arquivos do conjunto de dados usando um dos formatos de conjunto de dados compatíveis .

Usar arquivos fragmentados é significativamente mais simples do que usar a abordagem de conjunto de dados distribuídos de tensorflow finito (1 linha versus aproximadamente 20 linhas de código). No entanto, apenas a abordagem do conjunto de dados tensorflow oferece suporte ao pré-processamento do TensorFlow. Se o seu pipeline não contiver nenhum pré-processamento, a opção de conjunto de dados fragmentado será recomendada.

Em ambos os casos, o conjunto de dados deve ser fragmentado em vários arquivos para distribuir a leitura do conjunto de dados de forma eficiente.

Trabalhadores de configuração

O processo principal é o programa que executa o código python que define o modelo TensorFlow. Este processo não está executando nenhuma computação pesada. O cálculo efetivo do treinamento é feito pelos trabalhadores . Trabalhadores são processos que executam um servidor de parâmetros do TensorFlow.

O chefe deverá estar configurado com o endereço IP dos trabalhadores. Isso pode ser feito usando a variável de ambiente TF_CONFIG ou criando um ClusterResolver . Consulte Treinamento de servidor de parâmetros com ParameterServerStrategy para obter mais detalhes.

ParameterServerStrategy do TensorFlow define dois tipos de trabalhadores: "trabalhadores" e "servidor de parâmetros". O TensorFlow exige que pelo menos um de cada tipo de trabalhador seja instanciado. Porém, o TF-DF utiliza apenas “trabalhadores”. Portanto, um "servidor de parâmetros" precisa ser instanciado, mas não será utilizado pelo TF-DF. Por exemplo, a configuração de um treinamento TF-DF pode ser a seguinte:

  • 1 Chefe
  • 50 trabalhadores
  • 1 Servidor de parâmetros

Os trabalhadores precisam de acesso às operações de treinamento personalizadas do TensorFlow Decision Forests. Existem duas opções para ativar o acesso:

  1. Use o servidor de parâmetros TF-DF C++ pré-configurado //third_party/tensorflow_decision_forests/tensorflow/distribute:tensorflow_std_server .
  2. Crie um servidor de parâmetros chamando tf.distribute.Server() . Neste caso, TF-DF deve ser importado import tensorflow_decision_forests .

Exemplos

Esta seção mostra exemplos completos de configurações de treinamento distribuído. Para mais exemplos, verifique os testes unitários do TF-DF .

Exemplo: treinamento distribuído no caminho do conjunto de dados

Divida seu conjunto de dados em um conjunto de arquivos fragmentados usando um dos formatos de conjunto de dados compatíveis . Recomenda-se nomear os arquivos da seguinte forma: /path/to/dataset/train-<5 digit index>-of-<total files> , por exemplo

/path/to/dataset/train-00000-of-00100
/path/to/dataset/train-00001-of-00005
/path/to/dataset/train-00002-of-00005
...

Para máxima eficiência, o número de arquivos deve ser pelo menos 10x o número de trabalhadores. Por exemplo, se você estiver treinando com 100 trabalhadores, certifique-se de que o conjunto de dados esteja dividido em pelo menos 1.000 arquivos.

Os arquivos podem então ser referenciados com uma expressão de fragmentação como:

  • /caminho/para/conjunto de dados/train@1000
  • /caminho/para/conjunto de dados/train@*

O treinamento distribuído é feito da seguinte maneira. Neste exemplo, o conjunto de dados é armazenado como um TFRecord de exemplos do TensorFlow (definido pela chave tfrecord+tfe ).

import tensorflow_decision_forests as tfdf
import tensorflow as tf

strategy = tf.distribute.experimental.ParameterServerStrategy(...)

with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

model.fit_on_dataset_path(
    train_path="/path/to/dataset/train@1000",
    label_key="label_key",
    dataset_format="tfrecord+tfe")

print("Trained model")
model.summary()

Exemplo: treinamento distribuído em um conjunto de dados distribuído finito do TensorFlow

TF-DF espera um conjunto de dados TensorFlow distribuído e finito e fragmentado por trabalhador:

  • Distribuído : um conjunto de dados não distribuído é agrupado em strategy.distribute_datasets_from_function .
  • finito : o conjunto de dados deve ler cada exemplo exatamente uma vez. O conjunto de dados não deve conter instruções repeat .
  • trabalhador-sharded : cada trabalhador deve ler uma parte separada do conjunto de dados.

Aqui está um exemplo:

import tensorflow_decision_forests as tfdf
import tensorflow as tf


def dataset_fn(context, paths):
  """Create a worker-sharded finite dataset from paths.

  Like for non-distributed training, each example should be visited exactly
  once (and by only one worker) during the training. In addition, for optimal
  training speed, the reading of the examples should be distributed among the
  workers (instead of being read by a single worker, or read and discarded
  multiple times).

  In other words, don't add a "repeat" statement and make sure to shard the
  dataset at the file level and not at the example level.
  """

  # List the dataset files
  ds_path = tf.data.Dataset.from_tensor_slices(paths)

  # Make sure the dataset is used with distributed training.
  assert context is not None


  # Split the among the workers.
  #
  # Note: The "shard" is applied on the file path. The shard should not be
  # applied on the examples directly.
  # Note: You cannot use 'context.num_input_pipelines' with ParameterServerV2.
  current_worker = tfdf.keras.get_worker_idx_and_num_workers(context)
  ds_path = ds_path.shard(
      num_shards=current_worker.num_workers,
      index=current_worker.worker_idx)

  def read_csv_file(path):
    """Reads a single csv file."""

    numerical = tf.constant([0.0], dtype=tf.float32)
    categorical_string = tf.constant(["NA"], dtype=tf.string)
    csv_columns = [
        numerical,  # feature 1
        categorical_string,  # feature 2
        numerical,  # feature 3
        # ... define the features here.
    ]
    return tf.data.experimental.CsvDataset(path, csv_columns, header=True)

  ds_columns = ds_path.interleave(read_csv_file)

  # We assume a binary classification label with the following possible values.
  label_values = ["<=50K", ">50K"]

  # Convert the text labels into integers:
  # "<=50K" => 0
  # ">50K" => 1
  init_label_table = tf.lookup.KeyValueTensorInitializer(
      keys=tf.constant(label_values),
      values=tf.constant(range(label_values), dtype=tf.int64))
  label_table = tf.lookup.StaticVocabularyTable(
      init_label_table, num_oov_buckets=1)

  def extract_label(*columns):
    return columns[0:-1], label_table.lookup(columns[-1])

  ds_dataset = ds_columns.map(extract_label)

  # The batch size has no impact on the quality of the model. However, a larger
  # batch size generally is faster.
  ds_dataset = ds_dataset.batch(500)
  return ds_dataset


strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

  train_dataset = strategy.distribute_datasets_from_function(
      lambda context: dataset_fn(context, [...list of csv files...])
  )

model.fit(train_dataset)

print("Trained model")
model.summary()

Exemplo: ajuste de hiperparâmetros distribuído em um caminho de conjunto de dados

O ajuste distribuído de hiperparâmetros em um caminho de conjunto de dados é semelhante ao treinamento distribuído. A única diferença é que esta opção é compatível com modelos não distribuídos. Por exemplo, você pode distribuir o ajuste de hiperparâmetros do modelo Gradient Boosted Trees (não distribuído).

with strategy.scope():
  tuner = tfdf.tuner.RandomSearch(num_trials=30, use_predefined_hps=True)
  model = tfdf.keras.GradientBoostedTreesModel(tuner=tuner)

training_history = model.fit_on_dataset_path(
  train_path=train_path,
  label_key=label,
  dataset_format="csv",
  valid_path=test_path)

logging.info("Trained model:")
model.summary()

Exemplo: teste de unidade

Para testar a unidade de treinamento distribuído, você pode criar processos de trabalho simulados. Consulte o método _create_in_process_tf_ps_cluster nos testes unitários do TF-DF para obter mais informações.