Comience con la validación de datos de TensorFlow

TensorFlow Data Validation (TFDV) puede analizar datos de entrenamiento y servicio para:

La API central admite cada parte de la funcionalidad, con métodos convenientes que se basan en la parte superior y se pueden llamar en el contexto de los cuadernos.

Cálculo de estadísticas de datos descriptivos

TFDV puede calcular estadísticas descriptivas que brindan una descripción general rápida de los datos en términos de las características que están presentes y las formas de sus distribuciones de valor. Herramientas como Facets Overview pueden proporcionar una visualización sucinta de estas estadísticas para facilitar la navegación.

Por ejemplo, suponga que path apunta a un archivo en formato TFRecord (que contiene registros de tipo tensorflow.Example ). El siguiente fragmento ilustra el cálculo de estadísticas utilizando TFDV:

    stats = tfdv.generate_statistics_from_tfrecord(data_location=path)

El valor devuelto es un búfer de protocolo DatasetFeatureStatisticsList . El cuaderno de ejemplo contiene una visualización de las estadísticas utilizando Facets Overview :

    tfdv.visualize_statistics(stats)

Captura de pantalla de visualización de estadísticas

El ejemplo anterior asume que los datos se almacenan en un archivo TFRecord . TFDV también admite el formato de entrada CSV, con extensibilidad para otros formatos comunes. Puede encontrar los decodificadores de datos disponibles aquí . Además, TFDV proporciona la función de utilidad tfdv.generate_statistics_from_dataframe para usuarios con datos en memoria representados como un DataFrame de pandas.

Además de calcular un conjunto predeterminado de estadísticas de datos, TFDV también puede calcular estadísticas para dominios semánticos (p. ej., imágenes, texto). Para habilitar el cálculo de estadísticas de dominio semántico, pase un objeto tfdv.StatsOptions con enable_semantic_domain_stats establecido en True a tfdv.generate_statistics_from_tfrecord .

Corriendo en la nube de Google

Internamente, TFDV utiliza el marco de procesamiento de datos en paralelo de Apache Beam para escalar el cálculo de estadísticas en grandes conjuntos de datos. Para las aplicaciones que desean integrarse más profundamente con TFDV (por ejemplo, adjuntar la generación de estadísticas al final de una canalización de generación de datos, generar estadísticas para datos en formato personalizado ), la API también expone un Beam PTransform para la generación de estadísticas.

Para ejecutar TFDV en Google Cloud, el archivo de rueda de TFDV debe descargarse y proporcionarse a los trabajadores de Dataflow. Descargue el archivo de la rueda al directorio actual de la siguiente manera:

pip download tensorflow_data_validation \
  --no-deps \
  --platform manylinux2010_x86_64 \
  --only-binary=:all:

El siguiente fragmento muestra un ejemplo de uso de TFDV en Google Cloud:


import tensorflow_data_validation as tfdv
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions

PROJECT_ID = ''
JOB_NAME = ''
GCS_STAGING_LOCATION = ''
GCS_TMP_LOCATION = ''
GCS_DATA_LOCATION = ''
# GCS_STATS_OUTPUT_PATH is the file path to which to output the data statistics
# result.
GCS_STATS_OUTPUT_PATH = ''

PATH_TO_WHL_FILE = ''


# Create and set your PipelineOptions.
options = PipelineOptions()

# For Cloud execution, set the Cloud Platform project, job_name,
# staging location, temp_location and specify DataflowRunner.
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = JOB_NAME
google_cloud_options.staging_location = GCS_STAGING_LOCATION
google_cloud_options.temp_location = GCS_TMP_LOCATION
options.view_as(StandardOptions).runner = 'DataflowRunner'

setup_options = options.view_as(SetupOptions)
# PATH_TO_WHL_FILE should point to the downloaded tfdv wheel file.
setup_options.extra_packages = [PATH_TO_WHL_FILE]

tfdv.generate_statistics_from_tfrecord(GCS_DATA_LOCATION,
                                       output_path=GCS_STATS_OUTPUT_PATH,
                                       pipeline_options=options)

En este caso, el prototipo de estadísticas generado se almacena en un archivo TFRecord escrito en GCS_STATS_OUTPUT_PATH .

NOTA Al llamar a cualquiera de las funciones tfdv.generate_statistics_... (p. ej., tfdv.generate_statistics_from_tfrecord ) en Google Cloud, debe proporcionar una output_path . Especificar Ninguno puede causar un error.

Inferir un esquema sobre los datos

El esquema describe las propiedades esperadas de los datos. Algunas de estas propiedades son:

  • qué características se espera que estén presentes
  • su tipo
  • el número de valores para una característica en cada ejemplo
  • la presencia de cada característica en todos los ejemplos
  • los dominios de características esperados.

En resumen, el esquema describe las expectativas de datos "correctos" y, por lo tanto, puede usarse para detectar errores en los datos (descritos a continuación). Además, el mismo esquema se puede usar para configurar TensorFlow Transform para transformaciones de datos. Tenga en cuenta que se espera que el esquema sea bastante estático, por ejemplo, varios conjuntos de datos pueden ajustarse al mismo esquema, mientras que las estadísticas (descritas anteriormente) pueden variar según el conjunto de datos.

Dado que escribir un esquema puede ser una tarea tediosa, especialmente para conjuntos de datos con muchas funciones, TFDV proporciona un método para generar una versión inicial del esquema basada en las estadísticas descriptivas:

    schema = tfdv.infer_schema(stats)

En general, TFDV utiliza heurísticas conservadoras para inferir propiedades de datos estables a partir de las estadísticas a fin de evitar el sobreajuste del esquema al conjunto de datos específico. Se recomienda encarecidamente revisar el esquema inferido y refinarlo según sea necesario , para capturar cualquier conocimiento de dominio sobre los datos que la heurística de TFDV podría haber pasado por alto.

De manera predeterminada, tfdv.infer_schema infiere la forma de cada característica requerida, si value_count.min es igual a value_count.max para la característica. Establezca el argumento infer_feature_shape en False para deshabilitar la inferencia de forma.

El esquema en sí se almacena como un búfer de protocolo de esquema y, por lo tanto, se puede actualizar/editar utilizando la API de búfer de protocolo estándar. TFDV también proporciona algunos métodos de utilidad para facilitar estas actualizaciones. Por ejemplo, suponga que el esquema contiene la siguiente estrofa para describir una característica de cadena requerida payment_type que toma un solo valor:

feature {
  name: "payment_type"
  value_count {
    min: 1
    max: 1
  }
  type: BYTES
  domain: "payment_type"
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}

Para marcar que la función debe completarse en al menos el 50% de los ejemplos:

    tfdv.get_feature(schema, 'payment_type').presence.min_fraction = 0.5

El cuaderno de ejemplo contiene una visualización simple del esquema como una tabla, enumerando cada característica y sus características principales codificadas en el esquema.

Captura de pantalla de la visualización del esquema

Comprobación de los datos en busca de errores.

Dado un esquema, es posible verificar si un conjunto de datos se ajusta a las expectativas establecidas en el esquema o si existen anomalías en los datos . Puede verificar sus datos en busca de errores (a) en conjunto en un conjunto de datos completo comparando las estadísticas del conjunto de datos con el esquema, o (b) verificando errores por ejemplo.

Hacer coincidir las estadísticas del conjunto de datos con un esquema

Para verificar errores en el agregado, TFDV compara las estadísticas del conjunto de datos con el esquema y marca cualquier discrepancia. Por ejemplo:

    # Assume that other_path points to another TFRecord file
    other_stats = tfdv.generate_statistics_from_tfrecord(data_location=other_path)
    anomalies = tfdv.validate_statistics(statistics=other_stats, schema=schema)

El resultado es una instancia del búfer del protocolo de anomalías y describe los errores en los que las estadísticas no concuerdan con el esquema. Por ejemplo, suponga que los datos en other_path contienen ejemplos con valores para la función payment_type fuera del dominio especificado en el esquema.

Esto produce una anomalía.

   payment_type  Unexpected string values  Examples contain values missing from the schema: Prcard (<1%).

lo que indica que se encontró un valor fuera de dominio en las estadísticas en < 1% de los valores de características.

Si se esperaba esto, entonces el esquema se puede actualizar de la siguiente manera:

   tfdv.get_domain(schema, 'payment_type').value.append('Prcard')

Si la anomalía realmente indica un error de datos, los datos subyacentes deben corregirse antes de usarlos para el entrenamiento.

Los diversos tipos de anomalías que puede detectar este módulo se enumeran aquí .

El cuaderno de ejemplo contiene una visualización simple de las anomalías en forma de tabla, enumera las funciones en las que se detectan errores y una breve descripción de cada error.

Captura de pantalla de anomalías

Comprobación de errores por ejemplo

TFDV también ofrece la opción de validar los datos por ejemplo, en lugar de comparar las estadísticas de todo el conjunto de datos con el esquema. TFDV proporciona funciones para validar datos por ejemplo y luego generar estadísticas de resumen para los ejemplos anómalos encontrados. Por ejemplo:

   options = tfdv.StatsOptions(schema=schema)
   anomalous_example_stats = tfdv.validate_examples_in_tfrecord(
       data_location=input, stats_options=options)

El anomalous_example_stats que devuelve validate_examples_in_tfrecord es un búfer de protocolo DatasetFeatureStatisticsList en el que cada conjunto de datos consiste en el conjunto de ejemplos que exhiben una anomalía particular. Puede usar esto para determinar la cantidad de ejemplos en su conjunto de datos que exhiben una anomalía dada y las características de esos ejemplos.

Entornos de esquema

De forma predeterminada, las validaciones asumen que todos los conjuntos de datos en una tubería se adhieren a un solo esquema. En algunos casos, es necesario introducir ligeras variaciones en el esquema, por ejemplo, las características utilizadas como etiquetas son necesarias durante el entrenamiento (y deben validarse), pero faltan durante el servicio.

Los entornos se pueden utilizar para expresar dichos requisitos. En particular, las características del esquema se pueden asociar con un conjunto de entornos utilizando default_environment, in_environment y not_in_environment.

Por ejemplo, si la función de sugerencias se usa como etiqueta en el entrenamiento, pero falta en los datos de servicio. Sin el entorno especificado, aparecerá como una anomalía.

    serving_stats = tfdv.generate_statistics_from_tfrecord(data_location=serving_data_path)
    serving_anomalies = tfdv.validate_statistics(serving_stats, schema)

Captura de pantalla de anomalías en el servicio

Para solucionar esto, debemos configurar el entorno predeterminado para que todas las funciones sean tanto "ENTRENAMIENTO" como "SERVICIO", y excluir la función "consejos" del entorno de SERVICIO.

    # All features are by default in both TRAINING and SERVING environments.
    schema.default_environment.append('TRAINING')
    schema.default_environment.append('SERVING')

    # Specify that 'tips' feature is not in SERVING environment.
    tfdv.get_feature(schema, 'tips').not_in_environment.append('SERVING')

    serving_anomalies_with_env = tfdv.validate_statistics(
        serving_stats, schema, environment='SERVING')

Comprobación de sesgo y deriva de datos

Además de verificar si un conjunto de datos se ajusta a las expectativas establecidas en el esquema, TFDV también proporciona funcionalidades para detectar:

  • sesgo entre el entrenamiento y el servicio de datos
  • deriva entre diferentes días de datos de entrenamiento

TFDV realiza esta verificación comparando las estadísticas de diferentes conjuntos de datos en función de los comparadores de deriva/sesgo especificados en el esquema. Por ejemplo, para verificar si hay algún sesgo entre la característica 'pago_tipo' dentro del conjunto de datos de entrenamiento y servicio:

    # Assume we have already generated the statistics of training dataset, and
    # inferred a schema from it.
    serving_stats = tfdv.generate_statistics_from_tfrecord(data_location=serving_data_path)
    # Add a skew comparator to schema for 'payment_type' and set the threshold
    # of L-infinity norm for triggering skew anomaly to be 0.01.
    tfdv.get_feature(schema, 'payment_type').skew_comparator.infinity_norm.threshold = 0.01
    skew_anomalies = tfdv.validate_statistics(
        statistics=train_stats, schema=schema, serving_statistics=serving_stats)

NOTA La norma L-infinity solo detectará sesgos para las características categóricas. En lugar de especificar un umbral infinity_norm , especificar un umbral jensen_shannon_divergence en skew_comparator detectaría sesgos para características numéricas y categóricas.

Lo mismo con verificar si un conjunto de datos se ajusta a las expectativas establecidas en el esquema, el resultado también es una instancia del búfer del protocolo de anomalías y describe cualquier sesgo entre los conjuntos de datos de entrenamiento y servicio. Por ejemplo, suponga que los datos de publicación contienen significativamente más ejemplos con la característica payement_type con valor Cash , esto produce una anomalía de sesgo

   payment_type  High L-infinity distance between serving and training  The L-infinity distance between serving and training is 0.0435984 (up to six significant digits), above the threshold 0.01. The feature value with maximum difference is: Cash

Si la anomalía realmente indica un sesgo entre el entrenamiento y el servicio de datos, entonces se necesita más investigación, ya que esto podría tener un impacto directo en el rendimiento del modelo.

El bloc de notas de ejemplo contiene un ejemplo sencillo de comprobación de anomalías basadas en sesgos.

La detección de la deriva entre diferentes días de datos de entrenamiento se puede hacer de manera similar

    # Assume we have already generated the statistics of training dataset for
    # day 2, and inferred a schema from it.
    train_day1_stats = tfdv.generate_statistics_from_tfrecord(data_location=train_day1_data_path)
    # Add a drift comparator to schema for 'payment_type' and set the threshold
    # of L-infinity norm for triggering drift anomaly to be 0.01.
    tfdv.get_feature(schema, 'payment_type').drift_comparator.infinity_norm.threshold = 0.01
    drift_anomalies = tfdv.validate_statistics(
        statistics=train_day2_stats, schema=schema, previous_statistics=train_day1_stats)

NOTA La norma L-infinity solo detectará sesgos para las características categóricas. En lugar de especificar un umbral infinity_norm , especificar un umbral jensen_shannon_divergence en skew_comparator detectaría sesgos para características numéricas y categóricas.

Escribir conector de datos personalizado

Para calcular estadísticas de datos, TFDV proporciona varios métodos convenientes para manejar datos de entrada en varios formatos (por ejemplo, TFRecord of tf.train.Example , CSV, etc.). Si su formato de datos no está en esta lista, debe escribir un conector de datos personalizado para leer datos de entrada y conectarlo con la API principal de TFDV para calcular estadísticas de datos.

La API principal de TFDV para calcular estadísticas de datos es Beam PTransform que toma una PCollection de lotes de ejemplos de entrada (un lote de ejemplos de entrada se representa como un Arrow RecordBatch) y genera una PCollection que contiene un solo búfer de protocolo DatasetFeatureStatisticsList .

Una vez que haya implementado el conector de datos personalizado que agrupa sus ejemplos de entrada en un Arrow RecordBatch, debe conectarlo con la API tfdv.GenerateStatistics para calcular las estadísticas de datos. Tome TFRecord de tf.train.Example por ejemplo. tfx_bsl proporciona el conector de datos TFExampleRecord y, a continuación, se muestra un ejemplo de cómo conectarlo con la API tfdv.GenerateStatistics .

import tensorflow_data_validation as tfdv
from tfx_bsl.public import tfxio
import apache_beam as beam
from tensorflow_metadata.proto.v0 import statistics_pb2

DATA_LOCATION = ''
OUTPUT_LOCATION = ''

with beam.Pipeline() as p:
    _ = (
    p
    # 1. Read and decode the data with tfx_bsl.
    | 'TFXIORead' >> (
          tfxio.TFExampleRecord(
              file_pattern=[DATA_LOCATION],
              telemetry_descriptors=['my', 'tfdv']).BeamSource())
    # 2. Invoke TFDV `GenerateStatistics` API to compute the data statistics.
    | 'GenerateStatistics' >> tfdv.GenerateStatistics()
    # 3. Materialize the generated data statistics.
    | 'WriteStatsOutput' >> WriteStatisticsToTFRecord(OUTPUT_LOCATION))

Cálculo de estadísticas sobre segmentos de datos

TFDV se puede configurar para calcular estadísticas sobre segmentos de datos. El corte se puede habilitar proporcionando funciones de corte que toman un Arrow RecordBatch y generan una secuencia de tuplas de forma (slice key, record batch) . TFDV proporciona una manera fácil de generar funciones de corte basadas en valores de características que se pueden proporcionar como parte de tfdv.StatsOptions al calcular estadísticas.

Cuando el corte está habilitado, el prototipo de DatasetFeatureStatisticsList de salida contiene varios prototipos de DatasetFeatureStatistics , uno para cada segmento. Cada segmento se identifica con un nombre único que se establece como el nombre del conjunto de datos en el protocolo DatasetFeatureStatistics . De forma predeterminada, TFDV calcula estadísticas para el conjunto de datos general además de los sectores configurados.

import tensorflow_data_validation as tfdv
from tensorflow_data_validation.utils import slicing_util

# Slice on country feature (i.e., every unique value of the feature).
slice_fn1 = slicing_util.get_feature_value_slicer(features={'country': None})

# Slice on the cross of country and state feature (i.e., every unique pair of
# values of the cross).
slice_fn2 = slicing_util.get_feature_value_slicer(
    features={'country': None, 'state': None})

# Slice on specific values of a feature.
slice_fn3 = slicing_util.get_feature_value_slicer(
    features={'age': [10, 50, 70]})

stats_options = tfdv.StatsOptions(
    slice_functions=[slice_fn1, slice_fn2, slice_fn3])