API de jeu de données Avro

Voir sur TensorFlow.org Exécuter dans Google Colab Voir la source sur GitHub Télécharger le cahier

Aperçu

L'objectif de l' API Avro DataSet est de charger des données formatées Avro nativement dans tensorflow comme jeu de données tensorflow . Avro est un système de sérialisation de données similaire à Protocol Buffers. Il est largement utilisé dans Apache Hadoop où il peut fournir à la fois un format de sérialisation pour les données persistantes et un format filaire pour la communication entre les nœuds Hadoop. Les données Avro sont un format de données binaires compactées et orientées lignes. Il repose sur un schéma qui est stocké dans un fichier JSON séparé. Pour la spécification d' un format Avro et la déclaration de schéma, s'il vous plaît se référer au manuel officiel .

Paquet d'installation

Installez le package tensorflow-io requis

pip install tensorflow-io

Importer des packages

import tensorflow as tf
import tensorflow_io as tfio

Valider les importations tf et tfio

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0
tensorflow version: 2.5.0

Usage

Explorer l'ensemble de données

Pour les besoins de ce didacticiel, téléchargeons l'exemple de jeu de données Avro.

Téléchargez un exemple de fichier Avro :

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1268      0 --:--:-- --:--:-- --:--:--  1268
100   369  100   369    0     0   1255      0 --:--:-- --:--:-- --:--:--  1255
-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro

Téléchargez le fichier de schéma correspondant de l'exemple de fichier Avro :

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1247      0 --:--:-- --:--:-- --:--:--  1247
100   271  100   271    0     0    780      0 --:--:-- --:--:-- --:--:--   780
-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc

Dans l'exemple ci-dessus, un ensemble de données de test Avro a été créé sur la base de l'ensemble de données mnist. L'ensemble de données mnist d' origine au format TFRecord est généré à partir ensemble de données nommé TF . Cependant, l'ensemble de données mnist est trop volumineux en tant qu'ensemble de données de démonstration. Par souci de simplicité, la plupart ont été rognés et seuls les premiers enregistrements ont été conservés. De plus, règlage supplémentaire a été fait pour l' image champ dans les données mnist d' origine et mis en correspondance à des features champ Avro. Ainsi , le fichier Avro train.avro dispose de 4 disques, dont chacun a 3 champs: features , ce qui est un tableau de int, label , un int ou nulle, et dataType , un ENUM. Pour afficher le décodé train.avro (Notez le fichier de données Avro d' origine ne sont pas lisible par l' homme comme Avro est un format compact):

Installez le package requis pour lire le fichier Avro :

pip install avro

Pour lire et imprimer un fichier Avro dans un format lisible par l'homme :

from avro.io import DatumReader
from avro.datafile import DataFileReader

import json

def print_avro(avro_file, max_record_num=None):
    if max_record_num is not None and max_record_num <= 0:
        return

    with open(avro_file, 'rb') as avro_handler:
        reader = DataFileReader(avro_handler, DatumReader())
        record_count = 0
        for record in reader:
            record_count = record_count+1
            print(record)
            if max_record_num is not None and record_count == max_record_num:
               break

print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}

Et le schéma de train.avro qui est représenté par train.avsc est un fichier au format JSON. Pour voir l' train.avsc :

def print_schema(avro_schema_file):
    with open(avro_schema_file, 'r') as handle:
        parsed = json.load(handle)
    print(json.dumps(parsed, indent=4, sort_keys=True))

print_schema('train.avsc')
{
    "fields": [
        {
            "name": "features",
            "type": {
                "items": "int",
                "type": "array"
            }
        },
        {
            "name": "label",
            "type": [
                "int",
                "null"
            ]
        },
        {
            "name": "dataType",
            "type": {
                "name": "dataTypes",
                "symbols": [
                    "TRAINING",
                    "VALIDATION"
                ],
                "type": "enum"
            }
        }
    ],
    "name": "ImageDataset",
    "type": "record"
}

Préparer le jeu de données

Charge train.avro comme jeu de données tensorflow avec l' API de jeu de données Avro:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              num_epochs=1)

for record in dataset:
    print(record['features[*]'])
    print(record['label'])
    print(record['dataType'])
    print("--------------------")
SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]
 [1 0]
 [1 1]
 [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))
tf.Tensor([-100    2    3], shape=(3,), dtype=int32)
tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)
--------------------
SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)
--------------------

Les convertis par exemple ci - dessus train.avro en jeu de données tensorflow. Chaque élément du jeu de données est un dictionnaire dont la clé est le nom de la caractéristique, la valeur est le tenseur clairsemé ou dense converti. Par exemple, il convertit les features , label , dataType champ à une VarLenFeature (SparseTensor), FixedLenFeature (DenseTensor) et FixedLenFeature (DenseTensor) , respectivement. Depuis batch_size est 3, il contraignent 3 enregistrements de train.avro en un seul élément dans l'ensemble de données de résultat. Pour le premier enregistrement de train.avro dont l' étiquette est nulle, remplace lecteur avro avec la valeur par défaut spécifiée (-100). Dans cet exemple, il sont 4 dossiers au total en train.avro . Étant donné que la taille du lot est 3, le jeu de données de résultats contient 3 éléments, dont la dernière taille du lot de 1. Mais l' utilisateur est également en mesure de déposer le dernier lot si la taille est plus petite que la taille des lots en permettant drop_final_batch . Par exemple:

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

On peut également augmenter num_parallel_reads pour accélérer le traitement des données Avro en augmentant le parallélisme d'analyse/lecture d'avro.

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              num_parallel_reads=16,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

Pour l' utilisation détaillée des make_avro_record_dataset , s'il vous plaît se référer à API doc .

Entraîner les modèles tf.keras avec le jeu de données Avro

Voyons maintenant un exemple de bout en bout d'apprentissage du modèle tf.keras avec un jeu de données Avro basé sur un jeu de données mnist.

Charge train.avro comme jeu de données tensorflow avec l' API de jeu de données Avro:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=1,
                                                              num_epochs=1)

Définissez un modèle de keras simple :

def build_and_compile_cnn_model():
    model = tf.keras.Sequential()
    model.compile(optimizer='sgd', loss='mse')
    return model

model = build_and_compile_cnn_model()

Entraînez le modèle keras avec l'ensemble de données Avro :

model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>}
Consider rewriting this model with the Functional API.
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>}
Consider rewriting this model with the Functional API.
1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>

L'ensemble de données avro peut analyser et forcer toutes les données avro dans des tenseurs TensorFlow, y compris des enregistrements dans des enregistrements, des cartes, des tableaux, des branches et des énumérations. Les informations d'analyse sont transmises à l'implémentation de l'ensemble de données avro sous forme de carte où les clés encodent comment analyser les valeurs de données encodent sur la façon de forcer les données dans les tenseurs TensorFlow - en décidant du type primitif (par exemple bool, int, long, float, double, string ) ainsi que le type de tenseur (par exemple clairsemé ou dense). Une liste des types d'analyseur syntaxique de TensorFlow (voir le tableau 1) et la coercition des types primitifs (tableau 2) est fournie.

Tableau 1 des types d'analyseurs TensorFlow pris en charge :

Types d'analyseur TensorFlow TensorFlow Tensors Explication
tf.FixedLenFeature([], tf.int32) tenseur dense Analyser une entité de longueur fixe ; c'est-à-dire que toutes les lignes ont le même nombre constant d'éléments, par exemple un seul élément ou un tableau qui a toujours le même nombre d'éléments pour chaque ligne
tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50]) tenseur clairsemé Analysez une entité clairsemée où chaque ligne a une liste de longueur variable d'indices et de valeurs. Le 'index_key' identifie les indices. La 'value_key' identifie la valeur. Le 'dtype' est le type de données. La 'taille' est la valeur d'index maximale attendue pour chaque entrée d'index
tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64) tenseur clairsemé Analyser une entité de longueur variable ; cela signifie que chaque ligne de données peut avoir un nombre variable d'éléments, par exemple la 1ère ligne a 5 éléments, la 2ème ligne a 7 éléments

Tableau 2 de la conversion prise en charge des types Avro vers les types TensorFlow :

Type primitif Avro Type primitif TensorFlow
booléen : une valeur binaire tf.bool
octets : une séquence d'octets non signés de 8 bits tf.string
double : nombre à virgule flottante IEEE 64 bits double précision tf.float64
enum : type d'énumération tf.string utilisant le nom du symbole
float : nombre à virgule flottante IEEE 32 bits simple précision tf.float32
entier : entier signé 32 bits tf.int32
long : entier signé 64 bits tf.int64
nul : aucune valeur utilise la valeur par défaut
chaîne : séquence de caractères unicode tf.string

Un ensemble complet d'exemples de l' API de jeu de données Avro est fournie dans les essais .