אימות נתונים באמצעות צינור TFX ואימות נתונים של TensorFlow

במדריך זה המבוסס על מחברת, ניצור ונפעיל צינורות TFX כדי לאמת נתוני קלט וליצור מודל ML. מחברת זו מבוססת על צינור TFX שבנינו פשוט הדרכת צינור TFX . אם עדיין לא קראת את המדריך הזה, עליך לקרוא אותו לפני שתמשיך עם מחברת זו.

המשימה הראשונה בכל פרויקט מדעי נתונים או ML היא להבין ולנקות את הנתונים, הכוללים:

  • הבנת סוגי הנתונים, ההפצות ומידע אחר (לדוגמה, ערך ממוצע או מספר ייחודיים) לגבי כל תכונה
  • יצירת סכימה ראשונית המתארת ​​את הנתונים
  • זיהוי חריגות וערכים חסרים בנתונים ביחס לסכימה נתונה

במדריך זה, ניצור שני צינורות TFX.

ראשית, ניצור צינור לניתוח מערך הנתונים ויצירת סכימה ראשונית של מערך הנתונים הנתון. צינור זה יכלול שני מרכיבים חדשים, StatisticsGen ו SchemaGen .

לאחר שתהיה לנו סכימה נכונה של הנתונים, ניצור צינור להכשרת מודל סיווג ML המבוסס על הצינור מהמדריך הקודם. בצנרת זו, נשתמש סכימה מהצנרת הראשונה מרכיב חדש, ExampleValidator , כדי לאמת את נתון הקלט.

שלושת הרכיבים החדשים, StatisticsGen, SchemaGen ו ExampleValidator, הם מרכיבים TFX לניתוח נתונים ואימות, והם מיושמים באמצעות אימות נתונים TensorFlow הספרייה.

אנא ראה הבנת TFX צנרת כדי ללמוד עוד על מושגים שונים TFX.


ראשית עלינו להתקין את חבילת TFX Python ולהוריד את מערך הנתונים בו נשתמש עבור המודל שלנו.

שדרוג פיפ

כדי להימנע משדרוג Pip במערכת בעת הפעלה מקומית, בדוק כדי לוודא שאנו פועלים ב-Colab. ניתן כמובן לשדרג מערכות מקומיות בנפרד.

  import colab
  !pip install --upgrade pip

התקן TFX

pip install -U tfx

הפעלת מחדש את זמן הריצה?

אם אתה משתמש ב-Google Colab, בפעם הראשונה שאתה מפעיל את התא שלמעלה, עליך להפעיל מחדש את זמן הריצה על ידי לחיצה מעל לחצן "התחל ריצה מחדש" או שימוש בתפריט "זמן ריצה > הפעל מחדש זמן ריצה...". זה בגלל האופן שבו קולאב טוען חבילות.

בדוק את גרסאות TensorFlow ו-TFX.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
TensorFlow version: 2.6.2
TFX version: 1.4.0

הגדר משתנים

ישנם כמה משתנים המשמשים להגדרת צינור. אתה יכול להתאים אישית את המשתנים האלה כרצונך. כברירת מחדל, כל הפלט מהצינור ייווצר תחת הספרייה הנוכחית.

import os

# We will create two pipelines. One for schema generation and one for training.
SCHEMA_PIPELINE_NAME = "penguin-tfdv-schema"
PIPELINE_NAME = "penguin-tfdv"

# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')

# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

הכן נתונים לדוגמה

נוריד את מערך הנתונים לדוגמה לשימוש בצינור ה-TFX שלנו. בסיס הנתונים שאנו משתמשים הוא במערך הפינגווינים פאלמר אשר משמש גם אחרים דוגמאות TFX .

ישנן ארבע תכונות מספריות במערך הנתונים הזה:

  • culmen_length_mm
  • culmen_depth_mm
  • פליפר_length_mm
  • מסת_גוף_ג

כל התכונות כבר נורמלו לטווח [0,1]. אנחנו נבנינו מודל סיווג אשר חוזה את species של פינגווינים.

מכיוון שרכיב TFX ExampleGen קורא קלט מספריה, עלינו ליצור ספרייה ולהעתיק אליה את מערך הנתונים.

import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
_data_url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)
('/tmp/tfx-datan3p7t1d2/data.csv', <http.client.HTTPMessage at 0x7f8d2f9f9110>)

עיין במהירות בקובץ ה-CSV.

head {_data_filepath}

אתה אמור להיות מסוגל לראות חמש עמודות תכונה. species הם אחת 0, 1 או 2, וכול תכונות האחרות צריכות ערכים בין 0 ל -1 ניצור צינור TFX לנתח במערך הזה.

צור סכמה ראשונית

צינורות TFX מוגדרים באמצעות ממשקי API של Python. אנו ניצור צינור ליצירת סכימה מדוגמאות הקלט באופן אוטומטי. סכימה זו יכולה להיבדק על ידי אדם ולהתאים לפי הצורך. לאחר סיום הסכימה ניתן להשתמש בה להדרכה ואימות דוגמה במשימות מאוחרות יותר.

בנוסף CsvExampleGen אשר משמש פשוט TFX צינור הדרכה , נשתמש StatisticsGen ו SchemaGen :

  • StatisticsGen מחשבת הסטטיסטיקה עבור בסיס הנתונים.
  • SchemaGen בוחן את הסטטיסטיקה ויוצר סכימת נתון ראשונית.

עיין במדריכים עבור כל רכיב או רכיבים TFX הדרכה כדי ללמוד יותר על רכיבים אלה.

כתוב הגדרת צינור

אנו מגדירים פונקציה ליצירת צינור TFX. Pipeline אובייקט מייצג צינור TFX אשר ניתן להפעיל באמצעות אחת ממערכות תזמור צינור תומך TFX.

def _create_schema_pipeline(pipeline_name: str,
                            pipeline_root: str,
                            data_root: str,
                            metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a pipeline for schema generation."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Computes statistics over data for visualization and schema generation.
  statistics_gen = tfx.components.StatisticsGen(

  # NEW: Generates schema based on the generated statistics.
  schema_gen = tfx.components.SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

  components = [

  return tfx.dsl.Pipeline(

הפעל את הצינור

נשתמש LocalDagRunner כמו ההדרכה הקודמת.

אתה אמור לראות "INFO:absl:Component SchemaGen הסתיים." אם הצינור הסתיים בהצלחה.

נבחן את הפלט של הצינור כדי להבין את מערך הנתונים שלנו.

סקור את התפוקות של הצינור

כפי שמוסבר במדריך הקודם, צינור TFX מייצר שני סוגים של יציאות, חפצים ו metadata DB (MLMD) המכיל metadata של חפצים והוצאות להורג צינור. הגדרנו את מיקום הפלטים הללו בתאים שלעיל. כברירת מחדל, חפצים מאוחסנים תחת pipelines בספרייה metadata מאוחסן במסד נתונים SQLite תחת metadata בספרייה.

אתה יכול להשתמש בממשקי API של MLMD כדי לאתר את הפלטים הללו באופן פרוגרמטי. ראשית, נגדיר כמה פונקציות שירות כדי לחפש את חפצי הפלט שזה עתה נוצרו.

from ml_metadata.proto import metadata_store_pb2
# Non-public APIs, just for showcase.
from tfx.orchestration.portable.mlmd import execution_lib

# TODO(b/171447278): Move these functions into the TFX library.

def get_latest_artifacts(metadata, pipeline_name, component_id):
  """Output artifacts of the latest run of the component."""
  context = metadata.store.get_context_by_type_and_name(
      'node', f'{pipeline_name}.{component_id}')
  executions = metadata.store.get_executions_by_context(context.id)
  latest_execution = max(executions,
                         key=lambda e:e.last_update_time_since_epoch)
  return execution_lib.get_artifacts_dict(metadata, latest_execution.id,

# Non-public APIs, just for showcase.
from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
    if visualization:

from tfx.orchestration.experimental.interactive import standard_visualizations

כעת נוכל לבחון את התפוקות מביצוע הצינור.

# Non-public APIs, just for showcase.
from tfx.orchestration.metadata import Metadata
from tfx.types import standard_component_specs

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(

with Metadata(metadata_connection_config) as metadata_handler:
  # Find output artifacts from MLMD.
  stat_gen_output = get_latest_artifacts(metadata_handler, SCHEMA_PIPELINE_NAME,
  stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]

  schema_gen_output = get_latest_artifacts(metadata_handler,
                                           SCHEMA_PIPELINE_NAME, 'SchemaGen')
  schema_artifacts = schema_gen_output[standard_component_specs.SCHEMA_KEY]
INFO:absl:MetadataStore with DB connection initialized

זה הזמן לבחון את התפוקות מכל רכיב. כפי שתואר לעיל, אימות נתוני Tensorflow (TFDV) משמשות StatisticsGen ו SchemaGen , ו TFDV גם מספק ויזואליזציה של היציאות מן הרכיבים הללו.

במדריך זה, נשתמש בשיטות העזר להדמיה ב-TFX המשתמשות ב-TFDV באופן פנימי כדי להציג את ההדמיה.

בדוק את הפלט מ-StatisticsGen

# docs-infra: no-execute

אתה יכול לראות נתונים סטטיסטיים שונים עבור נתוני הקלט. נתונים אלה מסופקים SchemaGen לבנות סכימה ראשונית של הנתונים באופן אוטומטי.

בדוק את הפלט מ- SchemaGen


סכימה זו מוסקת אוטומטית מהפלט של StatisticsGen. אתה אמור להיות מסוגל לראות 4 תכונות FLOAT ותכונת INT אחת.

ייצא את הסכימה לשימוש עתידי

עלינו לסקור ולחדד את הסכימה שנוצרה. יש להתמיד בסכימה שנבדקה כדי לשמש בצינורות הבאים לאימון מודל ML. במילים אחרות, ייתכן שתרצה להוסיף את קובץ הסכימה למערכת בקרת הגרסאות שלך למקרי שימוש בפועל. במדריך זה, פשוט נעתיק את הסכימה לנתיב מוגדר מראש של מערכת קבצים למען הפשטות.

import shutil

_schema_filename = 'schema.pbtxt'
SCHEMA_PATH = 'schema'

os.makedirs(SCHEMA_PATH, exist_ok=True)
_generated_path = os.path.join(schema_artifacts[0].uri, _schema_filename)

# Copy the 'schema.pbtxt' file from the artifact uri to a predefined path.
shutil.copy(_generated_path, SCHEMA_PATH)

קובץ הסכימה משתמש בפורמט טקסט חוצץ פרוטוקול ואת מופע של פרוטו סכימת Metadata TensorFlow .

print(f'Schema at {SCHEMA_PATH}-----')
!cat {SCHEMA_PATH}/*
Schema at schema-----
feature {
  name: "body_mass_g"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  shape {
    dim {
      size: 1
feature {
  name: "culmen_depth_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  shape {
    dim {
      size: 1
feature {
  name: "culmen_length_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  shape {
    dim {
      size: 1
feature {
  name: "flipper_length_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  shape {
    dim {
      size: 1
feature {
  name: "species"
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  shape {
    dim {
      size: 1

עליך להקפיד לסקור ואולי לערוך את הגדרת הסכימה לפי הצורך. במדריך זה, אנו פשוט נשתמש בסכימה שנוצרה ללא שינוי.

אימות דוגמאות קלט והכשרת מודל ML

אנחנו נחזור אל הצינור שיצרנו פשוט TFX צינור הדרכה , להכשיר מודל ML ולהשתמש הסכימה שנוצרה על כתיבת קוד הכשרה במודל.

אנחנו גם נוסיף ExampleValidator רכיב אשר יחפש אנומליות וערכים חסרים במערך נכנס לעניין הסכימה.

כתוב קוד אימון מודל

אנחנו צריכים לכתוב את קוד המודל כפי שעשינו פשוט TFX צינור הדרכה .

המודל עצמו זהה למדריך הקודם, אך הפעם נשתמש בסכימה שנוצרה מהצינור הקודם במקום לציין תכונות באופן ידני. רוב הקוד לא השתנה. ההבדל היחיד הוא שאנחנו לא צריכים לציין את השמות וסוגי התכונות בקובץ הזה. במקום זאת, אנו קוראים אותם מקובץ הסכימה.

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

# We don't need to specify _FEATURE_KEYS and _FEATURE_SPEC any more.
# Those information can be read from the given schema file.

_LABEL_KEY = 'species'


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for training.

    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  return data_accessor.tf_dataset_factory(
          batch_size=batch_size, label_key=_LABEL_KEY),

def _build_keras_model(schema: schema_pb2.Schema) -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

    A Keras Model.
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.

  # ++ Changed code: Uses all features in the schema except the label.
  feature_keys = [f.name for f in schema.feature if f.name != _LABEL_KEY]
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in feature_keys]
  # ++ End of the changed code.

  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)

  return model

# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

    fn_args: Holds args used to train the model as name/value pairs.

  # ++ Changed code: Reads in schema file passed to the Trainer component.
  schema = tfx.utils.parse_pbtxt_file(fn_args.schema_path, schema_pb2.Schema())
  # ++ End of the changed code.

  train_dataset = _input_fn(
  eval_dataset = _input_fn(

  model = _build_keras_model(schema)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

כעת השלמת את כל שלבי ההכנה לבניית צינור TFX לאימון מודלים.

כתוב הגדרת צינור

נוסיף שני רכיבים חדשים, Importer ו ExampleValidator . היבואן מביא קובץ חיצוני לצינור TFX. במקרה זה, זהו קובץ המכיל הגדרת סכימה. ExampleValidator יבחן את נתוני הקלט ויאמת האם כל נתוני הקלט תואמים את סכימת הנתונים שסיפקנו.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     schema_path: str, module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a pipeline using predefined schema with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Computes statistics over data for visualization and example validation.
  statistics_gen = tfx.components.StatisticsGen(

  # NEW: Import the schema.
  schema_importer = tfx.dsl.Importer(

  # NEW: Performs anomaly detection based on statistics and data schema.
  example_validator = tfx.components.ExampleValidator(

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      schema=schema_importer.outputs['result'],  # Pass the imported schema.

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(

  components = [

      # NEW: Following three components were added to the pipeline.


  return tfx.dsl.Pipeline(

הפעל את הצינור

אתה אמור לראות "INFO:absl:Pusher Component הסתיים." אם הצינור הסתיים בהצלחה.

בחן את התפוקות של הצינור

אימנו את מודל הסיווג לפינגווינים, וגם אימתנו את דוגמאות הקלט ברכיב ה-ExampleValidator. אנו יכולים לנתח את הפלט מ-ExampleValidator כפי שעשינו עם הצינור הקודם.

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(

with Metadata(metadata_connection_config) as metadata_handler:
  ev_output = get_latest_artifacts(metadata_handler, PIPELINE_NAME,
  anomalies_artifacts = ev_output[standard_component_specs.ANOMALIES_KEY]
INFO:absl:MetadataStore with DB connection initialized

ניתן להמחיש גם אנומליות לדוגמה מ-ExampleValidator.


אתה אמור לראות "לא נמצאו חריגות" עבור כל פיצול של דוגמאות. מכיוון שהשתמשנו באותם נתונים ששימשו ליצירת הסכימה בצינור הזה, לא צפויה כאן חריגה. אם אתה מפעיל את הצינור הזה שוב ושוב עם נתונים נכנסים חדשים, ExampleValidator אמור להיות מסוגל למצוא אי התאמה בין הנתונים החדשים לסכימה הקיימת.

אם נמצאו חריגות כלשהן, תוכל לעיין בנתונים שלך כדי לבדוק אם דוגמאות כלשהן אינן עוקבות אחר ההנחות שלך. פלטים מרכיבים אחרים כמו StatisticsGen עשויים להיות שימושיים. עם זאת, כל חריגות שימצאו לא יחסמו ביצועים נוספים של צינור.

הצעדים הבאים

אתה יכול למצוא עוד משאבים על https://www.tensorflow.org/tfx/tutorials

אנא ראה הבנת TFX צנרת כדי ללמוד עוד על מושגים שונים TFX.