Посмотреть на TensorFlow.org | Запустить в Google Colab | Посмотреть исходный код на GitHub | Скачать блокнот | Запуск в Google Cloud Vertex AI Workbench |
В этом учебном пособии на основе записной книжки в качестве источника данных для обучения модели машинного обучения будет использоваться Google Cloud BigQuery . Конвейер машинного обучения будет построен с использованием TFX и запущен в Google Cloud Vertex Pipelines.
Этот блокнот основан на конвейере TFX, который мы создали в учебнике Simple TFX Pipeline for Vertex Pipelines . Если вы еще не читали это руководство, прочтите его, прежде чем приступить к работе с этой записной книжкой.
BigQuery — это бессерверное, масштабируемое и экономичное многооблачное хранилище данных, разработанное для гибкости бизнеса. TFX можно использовать для чтения обучающих данных из BigQuery и публикации обученной модели в BigQuery.
В этом руководстве мы будем использовать компонент BigQueryExampleGen
, который считывает данные из BigQuery в конвейеры TFX.
Эта записная книжка предназначена для работы в Google Colab или записных книжках на платформе AI . Если вы не используете один из них, вы можете просто нажать кнопку «Запустить в Google Colab» выше.
Настраивать
Если вы выполнили учебник Simple TFX Pipeline for Vertex Pipelines Tutorial , у вас будет работающий проект GCP и корзина GCS, и это все, что нам нужно для этого руководства. Пожалуйста, сначала прочитайте предварительное руководство, если вы его пропустили.
Установить пакеты питона
Мы установим необходимые пакеты Python, включая TFX и KFP, для создания конвейеров ML и отправки заданий в Vertex Pipelines.
# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"
Вы перезапустили среду выполнения?
Если вы используете Google Colab, при первом запуске указанной выше ячейки необходимо перезапустить среду выполнения, нажав кнопку «RESTART RUNTIME» выше или используя меню «Runtime > Restart runtime...». Это связано с тем, как Colab загружает пакеты.
Если вы не используете Colab, вы можете перезапустить среду выполнения со следующей ячейкой.
# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)
Войдите в Google для этого блокнота
Если вы используете этот блокнот в Colab, выполните аутентификацию с помощью своей учетной записи пользователя:
import sys
if 'google.colab' in sys.modules:
from google.colab import auth
auth.authenticate_user()
Если вы используете ноутбуки на платформе AI , выполните аутентификацию в Google Cloud перед запуском следующего раздела, запустив
gcloud auth login
в окне терминала (которое можно открыть через меню « Файл» > « Создать »). Это нужно сделать только один раз для каждого экземпляра записной книжки.
Проверьте версии пакетов.
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1 TFX version: 1.6.0 KFP version: 1.8.11
Настройка переменных
Ниже мы настроим некоторые переменные, используемые для настройки конвейеров. Требуется следующая информация:
- Идентификатор и номер проекта GCP. См. Определение идентификатора и номера вашего проекта .
- Регион GCP для запуска трубопроводов. Дополнительные сведения о регионах, в которых доступны Vertex Pipelines, см. в руководстве по локациям Vertex AI .
- Сегмент Google Cloud Storage для хранения выходных данных конвейера.
Введите требуемые значения в ячейку ниже перед запуском .
GOOGLE_CLOUD_PROJECT = '' # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = '' # <--- ENTER THIS
GOOGLE_CLOUD_REGION = '' # <--- ENTER THIS
GCS_BUCKET_NAME = '' # <--- ENTER THIS
if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
from absl import logging
logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.
Настройте gcloud
для использования вашего проекта.
gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified. Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags] optional flags may be --help | --installation For detailed information on this command and its flags, run: gcloud config set --help
PIPELINE_NAME = 'penguin-bigquery'
# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
GCS_BUCKET_NAME, PIPELINE_NAME)
# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
GCS_BUCKET_NAME, PIPELINE_NAME)
# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)
# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
GCS_BUCKET_NAME, PIPELINE_NAME)
print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery
По умолчанию Vertex Pipelines использует учетную запись службы виртуальной машины GCE по умолчанию в формате [project-number]-compute@developer.gserviceaccount.com
. Нам нужно дать разрешение на использование BigQuery этой учетной записи, чтобы получить доступ к BigQuery в конвейере. Мы добавим в учетную запись роль «Пользователь BigQuery».
!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
--member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
--role=roles/bigquery.user
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified. Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags] optional flags may be --condition | --condition-from-file | --help For detailed information on this command and its flags, run: gcloud projects add-iam-policy-binding --help
См. документацию Vertex , чтобы узнать больше об учетных записях служб и конфигурации IAM.
Создать конвейер
Конвейеры TFX определяются с помощью API-интерфейсов Python, как мы это делали в учебнике Simple TFX Pipeline for Vertex Pipelines Tutorial . Ранее мы использовали CsvExampleGen
, который считывает данные из CSV-файла. В этом руководстве мы будем использовать компонент BigQueryExampleGen
, который считывает данные из BigQuery.
Подготовить запрос BigQuery
Мы будем использовать тот же набор данных Palmer Penguins . Однако мы будем считывать его из таблицы BigQuery tfx-oss-public.palmer_penguins.palmer_penguins
, которая заполняется с использованием того же CSV-файла.
Если вы используете Google Colab, вы можете напрямую изучить содержимое таблицы BigQuery.
# docs_infra: no_execute
%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5
Все признаки уже были нормализованы до 0~1, кроме species
, который является меткой. Мы построим классификационную модель, которая предсказывает species
пингвинов.
BigQueryExampleGen
требует запроса, чтобы указать, какие данные нужно извлечь. Поскольку мы будем использовать все поля всех строк таблицы, запрос будет довольно простым. Вы также можете указать имена полей и добавить условия WHERE
по мере необходимости в соответствии со стандартным синтаксисом SQL BigQuery .
QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"
Напишите код модели.
Мы будем использовать тот же код модели, что и в Simple TFX Pipeline Tutorial .
_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}
# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2
_FEATURE_KEYS = [
'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'
_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10
# Since we're not generating or creating a schema, we will instead create
# a feature spec. Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
**{
feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
for feature in _FEATURE_KEYS
},
_LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}
def _input_fn(file_pattern: List[str],
data_accessor: tfx.components.DataAccessor,
schema: schema_pb2.Schema,
batch_size: int) -> tf.data.Dataset:
"""Generates features and label for training.
Args:
file_pattern: List of paths or patterns of input tfrecord files.
data_accessor: DataAccessor for converting input to RecordBatch.
schema: schema of the input data.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
schema=schema).repeat()
def _make_keras_model() -> tf.keras.Model:
"""Creates a DNN Keras model for classifying penguin data.
Returns:
A Keras Model.
"""
# The model below is built with Functional API, please refer to
# https://www.tensorflow.org/guide/keras/overview for all API options.
inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
d = keras.layers.concatenate(inputs)
for _ in range(2):
d = keras.layers.Dense(8, activation='relu')(d)
outputs = keras.layers.Dense(3)(d)
model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=keras.optimizers.Adam(1e-2),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[keras.metrics.SparseCategoricalAccuracy()])
model.summary(print_fn=logging.info)
return model
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
# This schema is usually either an output of SchemaGen or a manually-curated
# version provided by pipeline author. A schema can also derived from TFT
# graph if a Transform component is used. In the case when either is missing,
# `schema_from_feature_spec` could be used to generate schema from very simple
# feature_spec, but the schema returned would be very primitive.
schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
train_dataset = _input_fn(
fn_args.train_files,
fn_args.data_accessor,
schema,
batch_size=_TRAIN_BATCH_SIZE)
eval_dataset = _input_fn(
fn_args.eval_files,
fn_args.data_accessor,
schema,
batch_size=_EVAL_BATCH_SIZE)
model = _make_keras_model()
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
# The result of the training should be saved in `fn_args.serving_model_dir`
# directory.
model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py
Скопируйте файл модуля в GCS, к которому можно получить доступ из компонентов конвейера. Поскольку обучение модели происходит в GCP, нам необходимо загрузить это определение модели.
В противном случае вы можете создать образ контейнера, включающий файл модуля, и использовать этот образ для запуска конвейера.
gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".
Напишите определение конвейера
Мы определим функцию для создания конвейера TFX. Нам нужно использовать BigQueryExampleGen
, который принимает query
в качестве аргумента. Еще одно изменение по сравнению с предыдущим уроком заключается в том, что нам нужно передать beam_pipeline_args
, который передается компонентам при их выполнении. Мы будем использовать beam_pipeline_args
для передачи дополнительных параметров в BigQuery.
from typing import List, Optional
def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
module_file: str, serving_model_dir: str,
beam_pipeline_args: Optional[List[str]],
) -> tfx.dsl.Pipeline:
"""Creates a TFX pipeline using BigQuery."""
# NEW: Query data in BigQuery as a data source.
example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
query=query)
# Uses user-provided Python function that trains a model.
trainer = tfx.components.Trainer(
module_file=module_file,
examples=example_gen.outputs['examples'],
train_args=tfx.proto.TrainArgs(num_steps=100),
eval_args=tfx.proto.EvalArgs(num_steps=5))
# Pushes the model to a file destination.
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=serving_model_dir)))
components = [
example_gen,
trainer,
pusher,
]
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=components,
# NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
beam_pipeline_args=beam_pipeline_args)
Запустите конвейер на Vertex Pipelines.
Мы будем использовать Vertex Pipelines для запуска конвейера, как мы это делали в Simple TFX Pipeline for Vertex Pipelines Tutorial .
Нам также нужно передать beam_pipeline_args
для BigQueryExampleGen. Он включает такие конфигурации, как имя проекта GCP и временное хранилище для выполнения BigQuery.
import os
# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
'--project=' + GOOGLE_CLOUD_PROJECT,
'--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
]
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'
runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
_create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
query=QUERY,
module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
serving_model_dir=SERVING_MODEL_DIR,
beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))
Сгенерированный файл определения можно отправить с помощью клиента kfp.
# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)
job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
display_name=PIPELINE_NAME)
job.run(sync=False)
Теперь вы можете посетить Vertex AI > Pipelines в Google Cloud Console , чтобы увидеть прогресс.