Ver en TensorFlow.org | Ejecutar en Google Colab | Ver fuente en GitHub | Descargar cuaderno |
Descripción general
El objetivo de API Avro conjunto de datos es para cargar Avro datos formateados de forma nativa en TensorFlow como TensorFlow conjunto de datos . Avro es un sistema de serialización de datos similar a Protocol Buffers. Se usa ampliamente en Apache Hadoop, donde puede proporcionar un formato de serialización para datos persistentes y un formato de cable para la comunicación entre los nodos de Hadoop. Los datos de Avro son un formato de datos binarios compactado y orientado a filas. Se basa en un esquema que se almacena como un archivo JSON separado. Para la especificación de formato y esquema de Avro declaración, por favor consulte el manual oficial .
Paquete de instalación
Instale el paquete tensorflow-io requerido
pip install tensorflow-io
Importar paquetes
import tensorflow as tf
import tensorflow_io as tfio
Validar importaciones de tf y tfio
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0 tensorflow version: 2.5.0
Uso
Explore el conjunto de datos
Para el propósito de este tutorial, descarguemos el conjunto de datos Avro de muestra.
Descargue un archivo Avro de muestra:
curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 151 100 151 0 0 1268 0 --:--:-- --:--:-- --:--:-- 1268 100 369 100 369 0 0 1255 0 --:--:-- --:--:-- --:--:-- 1255 -rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro
Descargue el archivo de esquema correspondiente del archivo Avro de muestra:
curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 151 100 151 0 0 1247 0 --:--:-- --:--:-- --:--:-- 1247 100 271 100 271 0 0 780 0 --:--:-- --:--:-- --:--:-- 780 -rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc
En el ejemplo anterior, se creó un conjunto de datos Avro de prueba basado en el conjunto de datos mnist. El conjunto de datos original en formato mnist TFRecord se genera a partir del TF llamado conjunto de datos . Sin embargo, el conjunto de datos mnist es demasiado grande como conjunto de datos de demostración. Por motivos de simplicidad, la mayor parte se recortó y solo se conservaron los primeros registros. Por otra parte, el recorte adicional se hizo para image
de campo de conjunto de datos original y mnist asigna a features
de campo en Avro. Así el archivo Avro train.avro
tiene 4 registros, cada uno de los cuales tiene 3 campos: features
, que es una matriz de int, label
, un int o nula, y dataType
, una enumeración. Para ver el decodificado train.avro
(Nota el archivo de datos Avro original, no es legible como Avro es un formato compactado):
Instale el paquete requerido para leer el archivo Avro:
pip install avro
Para leer e imprimir un archivo Avro en un formato legible por humanos:
from avro.io import DatumReader
from avro.datafile import DataFileReader
import json
def print_avro(avro_file, max_record_num=None):
if max_record_num is not None and max_record_num <= 0:
return
with open(avro_file, 'rb') as avro_handler:
reader = DataFileReader(avro_handler, DatumReader())
record_count = 0
for record in reader:
record_count = record_count+1
print(record)
if max_record_num is not None and record_count == max_record_num:
break
print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'} {'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'} {'features': [0], 'label': 3, 'dataType': 'VALIDATION'} {'features': [1], 'label': 4, 'dataType': 'VALIDATION'}
Y el esquema de train.avro
que está representado por train.avsc
es un archivo con formato JSON. Para ver el train.avsc
:
def print_schema(avro_schema_file):
with open(avro_schema_file, 'r') as handle:
parsed = json.load(handle)
print(json.dumps(parsed, indent=4, sort_keys=True))
print_schema('train.avsc')
{ "fields": [ { "name": "features", "type": { "items": "int", "type": "array" } }, { "name": "label", "type": [ "int", "null" ] }, { "name": "dataType", "type": { "name": "dataTypes", "symbols": [ "TRAINING", "VALIDATION" ], "type": "enum" } } ], "name": "ImageDataset", "type": "record" }
Prepare el conjunto de datos
Cargar train.avro
como conjunto de datos TensorFlow con Avro API conjunto de datos:
features = {
'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}
schema = tf.io.gfile.GFile('train.avsc').read()
dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
reader_schema=schema,
features=features,
shuffle=False,
batch_size=3,
num_epochs=1)
for record in dataset:
print(record['features[*]'])
print(record['label'])
print(record['dataType'])
print("--------------------")
SparseTensor(indices=tf.Tensor( [[0 0] [0 1] [0 2] [0 3] [0 4] [1 0] [1 1] [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64)) tf.Tensor([-100 2 3], shape=(3,), dtype=int32) tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string) -------------------- SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64)) tf.Tensor([4], shape=(1,), dtype=int32) tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string) --------------------
Las ejemplo convierte anteriores train.avro
en el conjunto de datos tensorflow. Cada elemento del conjunto de datos es un diccionario cuya clave es el nombre de la característica, el valor es el tensor denso o escaso convertido. Por ejemplo, convierte features
, label
, dataType
de campo a un VarLenFeature (SparseTensor), FixedLenFeature (DenseTensor), y FixedLenFeature (DenseTensor) respectivamente. Desde batch_size es 3, se coaccionar 3 registros de train.avro
en un solo elemento en el conjunto de datos resultado. Para el primer registro en train.avro
cuya etiqueta es nulo, lector de Avro lo reemplaza con el valor predeterminado especificado (-100). En este ejemplo, hay 4 registros en total en train.avro
. Dado que el tamaño de lote es 3, el conjunto de datos de resultados contiene 3 elementos, el último de los cuales de tamaño del lote es 1. Sin embargo usuario también es capaz de soltar el último lote si el tamaño es más pequeño que el tamaño del lote, permitiendo drop_final_batch
. P.ej:
dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
reader_schema=schema,
features=features,
shuffle=False,
batch_size=3,
drop_final_batch=True,
num_epochs=1)
for record in dataset:
print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100, 2, 3], dtype=int32)>}
También se puede aumentar num_parallel_reads para acelerar el procesamiento de datos de Avro aumentando el paralelismo de análisis / lectura de avro.
dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
reader_schema=schema,
features=features,
shuffle=False,
num_parallel_reads=16,
batch_size=3,
drop_final_batch=True,
num_epochs=1)
for record in dataset:
print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100, 2, 3], dtype=int32)>}
Para el uso detallado de make_avro_record_dataset
, por favor refiérase a la documentación del API .
Entrene modelos de tf.keras con el conjunto de datos de Avro
Ahora veamos un ejemplo de un extremo a otro del entrenamiento del modelo tf.keras con un conjunto de datos Avro basado en un conjunto de datos mnist.
Cargar train.avro
como conjunto de datos TensorFlow con Avro API conjunto de datos:
features = {
'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}
schema = tf.io.gfile.GFile('train.avsc').read()
dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
reader_schema=schema,
features=features,
shuffle=False,
batch_size=1,
num_epochs=1)
Defina un modelo de keras simple:
def build_and_compile_cnn_model():
model = tf.keras.Sequential()
model.compile(optimizer='sgd', loss='mse')
return model
model = build_and_compile_cnn_model()
Entrene el modelo de keras con el conjunto de datos de Avro:
model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>} Consider rewriting this model with the Functional API. WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>} Consider rewriting this model with the Functional API. 1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00 <tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>
El conjunto de datos avro puede analizar y convertir cualquier dato avro en tensores de TensorFlow, incluidos registros en registros, mapas, matrices, ramas y enumeraciones. La información de análisis se pasa a la implementación del conjunto de datos avro como un mapa donde las claves codifican cómo analizar los valores de los datos codifican sobre cómo convertir los datos en tensores de TensorFlow, decidiendo el tipo primitivo (p. Ej., Bool, int, long, float, double, string ) así como el tipo de tensor (por ejemplo, escaso o denso). Se proporciona una lista de los tipos de analizadores de TensorFlow (consulte la Tabla 1) y la coerción de los tipos primitivos (Tabla 2).
Tabla 1 los tipos de analizadores TensorFlow admitidos:
Tipos de analizadores de TensorFlow | Tensores de TensorFlow | Explicación |
---|---|---|
tf.FixedLenFeature ([], tf.int32) | tensor denso | Analizar una característica de longitud fija; es decir, todas las filas tienen el mismo número constante de elementos, por ejemplo, solo un elemento o una matriz que siempre tiene el mismo número de elementos para cada fila |
tf.SparseFeature (index_key = ['key_1st_index', 'key_2nd_index'], value_key = 'key_value', dtype = tf.int64, size = [20, 50]) | tensor escaso | Analice una característica dispersa donde cada fila tiene una lista de índices y valores de longitud variable. La 'clave_índice' identifica los índices. La 'clave_valor' identifica el valor. El 'dtype' es el tipo de datos. El 'tamaño' es el valor de índice máximo esperado para cada entrada de índice |
tfio.experimental.columnar.VarLenFeatureWithRank ([], tf.int64) | tensor escaso | Analizar una característica de longitud variable; eso significa que cada fila de datos puede tener un número variable de elementos, por ejemplo, la primera fila tiene 5 elementos, la segunda fila tiene 7 elementos |
Tabla 2 la conversión admitida de tipos de Avro a tipos de TensorFlow:
Tipo primitivo avro | Tipo primitivo de TensorFlow |
---|---|
booleano: un valor binario | tf.bool |
bytes: una secuencia de bytes sin firmar de 8 bits | tf.string |
doble: número de coma flotante IEEE de 64 bits de doble precisión | tf.float64 |
enum: tipo de enumeración | tf.string usando el nombre del símbolo |
float: número de coma flotante IEEE de 32 bits de precisión simple | tf.float32 |
int: entero de 32 bits con signo | tf.int32 |
long: entero de 64 bits con signo | tf.int64 |
nulo: sin valor | usa el valor predeterminado |
cadena: secuencia de caracteres unicode | tf.string |
Un conjunto completo de ejemplos de Avro API conjunto de datos se proporciona dentro de las pruebas .