Avro Dataset API

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Przegląd

Celem Avro zestawu danych API jest załadowanie Avro sformatowane dane natywnie pod TensorFlow jak TensorFlow zbiorze . Avro to system serializacji danych podobny do buforów protokołów. Jest szeroko stosowany w Apache Hadoop, gdzie może zapewnić zarówno format serializacji dla trwałych danych, jak i format przewodowy do komunikacji między węzłami Hadoop. Avro data to zorientowany na wiersze, skompaktowany format danych binarnych. Opiera się na schemacie, który jest przechowywany jako osobny plik JSON. Dla spec Avro deklaracji formatu i schematu, należy zapoznać się z oficjalnym podręczniku .

Pakiet instalacyjny

Zainstaluj wymagany pakiet tensorflow-io

pip install tensorflow-io

Importuj paczki

import tensorflow as tf
import tensorflow_io as tfio

Sprawdź poprawność importu tf i tfio

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0
tensorflow version: 2.5.0

Stosowanie

Przeglądaj zbiór danych

Na potrzeby tego samouczka pobierzmy przykładowy zestaw danych Avro.

Pobierz przykładowy plik Avro:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1268      0 --:--:-- --:--:-- --:--:--  1268
100   369  100   369    0     0   1255      0 --:--:-- --:--:-- --:--:--  1255
-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro

Pobierz odpowiedni plik schematu przykładowego pliku Avro:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1247      0 --:--:-- --:--:-- --:--:--  1247
100   271  100   271    0     0    780      0 --:--:-- --:--:-- --:--:--   780
-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc

W powyższym przykładzie testowy zestaw danych Avro został utworzony na podstawie zestawu danych mnist. Oryginalny mnist zestawu danych w formacie TFRecord jest generowany z TF o nazwie zestawu danych . Jednak zbiór danych mnist jest zbyt duży jak zbiór danych demonstracyjnych. Dla uproszczenia większość z nich została przycięta i zachowano tylko kilka pierwszych zapisów. Ponadto, dodatkowe przycinanie było zrobione dla image pola w oryginalnej mnist zbiorze i odwzorowany go do features polu Avro. Więc avro plik train.avro ma 4 rekordy, z których każda posiada 3 pola: features , które jest tablicą int, label , int lub null i dataType , wyliczenia. Aby zobaczyć dekodowany train.avro (Uwaga oryginalny Avro plik danych nie jest czytelny dla człowieka jako avro jest zagęszczony format):

Zainstaluj wymagany pakiet, aby odczytać plik Avro:

pip install avro

Aby odczytać i wydrukować plik Avro w formacie czytelnym dla człowieka:

from avro.io import DatumReader
from avro.datafile import DataFileReader

import json

def print_avro(avro_file, max_record_num=None):
    if max_record_num is not None and max_record_num <= 0:
        return

    with open(avro_file, 'rb') as avro_handler:
        reader = DataFileReader(avro_handler, DatumReader())
        record_count = 0
        for record in reader:
            record_count = record_count+1
            print(record)
            if max_record_num is not None and record_count == max_record_num:
               break

print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}

A schemat train.avro który jest reprezentowany przez train.avsc jest plikiem w formacie JSON. Aby zobaczyć train.avsc :

def print_schema(avro_schema_file):
    with open(avro_schema_file, 'r') as handle:
        parsed = json.load(handle)
    print(json.dumps(parsed, indent=4, sort_keys=True))

print_schema('train.avsc')
{
    "fields": [
        {
            "name": "features",
            "type": {
                "items": "int",
                "type": "array"
            }
        },
        {
            "name": "label",
            "type": [
                "int",
                "null"
            ]
        },
        {
            "name": "dataType",
            "type": {
                "name": "dataTypes",
                "symbols": [
                    "TRAINING",
                    "VALIDATION"
                ],
                "type": "enum"
            }
        }
    ],
    "name": "ImageDataset",
    "type": "record"
}

Przygotuj zbiór danych

Obciążenie train.avro jak TensorFlow zbioru danych ze zbioru danych Avro API:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              num_epochs=1)

for record in dataset:
    print(record['features[*]'])
    print(record['label'])
    print(record['dataType'])
    print("--------------------")
SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]
 [1 0]
 [1 1]
 [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))
tf.Tensor([-100    2    3], shape=(3,), dtype=int32)
tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)
--------------------
SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)
--------------------

Powyższy przykład konwertuje train.avro do tensorflow zbiorze. Każdy element zestawu danych jest słownikiem, którego kluczem jest nazwa funkcji, a wartość to przekonwertowany tensor rzadki lub gęsty. Na przykład, konwertuje features , label , dataType pola dla VarLenFeature (SparseTensor) FixedLenFeature (DenseTensor) i FixedLenFeature (DenseTensor), odpowiednio. Od batch_size wynosi 3, to zmusić 3 rekordy z train.avro do jednego elementu w zbiorze wynikowym. Dla pierwszego rekordu w train.avro którego etykieta jest null, Avro zastępuje czytnik go z podanej wartości domyślnej (-100). W tym przykładzie, są tam 4 rekordy ogółem w train.avro . Ponieważ wielkość partii wynosi 3, zestaw danych wynik zawiera 3 elementy, z których ostatni na wielkość partii wynosi 1. Jednak użytkownik może również spaść ostatnią partię, jeśli rozmiar jest mniejszy niż rozmiar partii umożliwiając drop_final_batch . Np:

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

Można również zwiększyć liczbę num_parallel_reads, aby przyspieszyć przetwarzanie danych Avro, zwiększając równoległość parsowania/odczytu avro.

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              num_parallel_reads=16,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

W celu uzyskania szczegółowych wykorzystania make_avro_record_dataset , proszę odnieść się do API doc .

Trenuj modele tf.keras za pomocą zestawu danych Avro

Przejdźmy teraz przez kompletny przykład uczenia modelu tf.keras z zestawem danych Avro opartym na zestawie danych mnist.

Obciążenie train.avro jak TensorFlow zbioru danych ze zbioru danych Avro API:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=1,
                                                              num_epochs=1)

Zdefiniuj prosty model keras:

def build_and_compile_cnn_model():
    model = tf.keras.Sequential()
    model.compile(optimizer='sgd', loss='mse')
    return model

model = build_and_compile_cnn_model()

Trenuj model Keras za pomocą zestawu danych Avro:

model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>}
Consider rewriting this model with the Functional API.
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>}
Consider rewriting this model with the Functional API.
1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>

Zestaw danych avro może analizować i przekształcać dowolne dane avro w tensory TensorFlow, w tym rekordy w rekordach, mapy, tablice, gałęzie i wyliczenia. Informacje o parsowaniu są przekazywane do implementacji zestawu danych avro w postaci mapy, w której klucze kodują sposób parsowania wartości danych kodują sposób przekonwertowania danych na tensory TensorFlow – decydowanie o typie podstawowym (np. bool, int, long, float, double, string ), a także typ tensora (np. rzadki lub gęsty). Przedstawiono listę typów parserów TensorFlow (patrz Tabela 1) oraz wymuszanie typów pierwotnych (Tabela 2).

Tabela 1 obsługiwane typy parserów TensorFlow:

Typy parserów TensorFlow TensorFlow Tensory Wyjaśnienie
tf.FixedLenFeature([], tf.int32) gęsty tensor Przeanalizuj element o stałej długości; czyli wszystkie wiersze mają taką samą stałą liczbę elementów, np. tylko jeden element lub tablica, która ma zawsze taką samą liczbę elementów w każdym wierszu
tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50]) rzadki tensor Przeanalizuj rzadki element, w którym każdy wiersz ma listę indeksów i wartości o zmiennej długości. 'index_key' identyfikuje indeksy. „value_key” identyfikuje wartość. „dtype” to typ danych. „Rozmiar” to oczekiwana maksymalna wartość indeksu dla każdego wpisu indeksu
tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64) rzadki tensor Przeanalizuj funkcję o zmiennej długości; oznacza to, że każdy wiersz danych może mieć zmienną liczbę elementów, np. pierwszy wiersz ma 5 elementów, drugi wiersz ma 7 elementów

Tabela 2 obsługiwana konwersja z typów Avro na typy TensorFlow:

Typ prymitywny Avro Typ prymitywu TensorFlow
boolean: wartość binarna tf.bool
bytes: sekwencja 8-bitowych bajtów bez znaku tf.string
double: 64-bitowa liczba zmiennoprzecinkowa IEEE o podwójnej precyzji tf.float64
wyliczenie: typ wyliczenia tf.string używając nazwy symbolu
float: 32-bitowa liczba zmiennoprzecinkowa IEEE o pojedynczej precyzji tf.float32
int: 32-bitowa liczba całkowita ze znakiem tf.int32
long: 64-bitowa liczba całkowita ze znakiem tf.int64
null: brak wartości używa wartości domyślnej
ciąg: sekwencja znaków Unicode tf.string

Obszerny zestaw przykładów Avro zestawu danych API jest dostarczona w testach .