Conjuntos de dados Tensorflow de coleções do MongoDB

Veja no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Visão geral

Este tutorial se concentra na preparação tf.data.Dataset s através da leitura de dados a partir de coleções MongoDB e usá-lo para treinar um tf.keras modelo.

Pacotes de configuração

Isto usa tutoriais pymongo como um pacote auxiliar para criar um novo banco de dados MongoDB e coleção para armazenar os dados.

Instale os pacotes tensorflow-io e mongodb (auxiliar) necessários

pip install -q tensorflow-io
pip install -q pymongo

Importar pacotes

import os
import time
from pprint import pprint
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental import preprocessing
import tensorflow_io as tfio
from pymongo import MongoClient

Validar importações de tf e tfio

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.20.0
tensorflow version: 2.6.0

Baixe e configure a instância do MongoDB

Para fins de demonstração, a versão de código aberto do mongodb é usada.


sudo apt install -y mongodb >log
service mongodb start

* Starting database mongodb
   ...done.
WARNING: apt does not have a stable CLI interface. Use with caution in scripts.

debconf: unable to initialize frontend: Dialog
debconf: (No usable dialog-like program is installed, so the dialog based frontend cannot be used. at /usr/share/perl5/Debconf/FrontEnd/Dialog.pm line 76, <> line 8.)
debconf: falling back to frontend: Readline
debconf: unable to initialize frontend: Readline
debconf: (This frontend requires a controlling tty.)
debconf: falling back to frontend: Teletype
dpkg-preconfigure: unable to re-open stdin:
# Sleep for few seconds to let the instance start.
time.sleep(5)

Uma vez que a instância foi iniciada, grep para mongo nos processos lista para confirmar a disponibilidade.


ps -ef | grep mongo
mongodb      580       1 13 17:38 ?        00:00:00 /usr/bin/mongod --config /etc/mongodb.conf
root         612     610  0 17:38 ?        00:00:00 grep mongo

consulte o endpoint base para recuperar informações sobre o cluster.

client = MongoClient()
client.list_database_names() # ['admin', 'local']
['admin', 'local']

Explorar o conjunto de dados

Para o propósito deste tutorial, vamos baixar o Petfinder conjunto de dados e alimentar os dados no MongoDB manualmente. O objetivo desse problema de classificação é prever se o pet será adotado ou não.

dataset_url = 'http://storage.googleapis.com/download.tensorflow.org/data/petfinder-mini.zip'
csv_file = 'datasets/petfinder-mini/petfinder-mini.csv'
tf.keras.utils.get_file('petfinder_mini.zip', dataset_url,
                        extract=True, cache_dir='.')
pf_df = pd.read_csv(csv_file)
Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/petfinder-mini.zip
1671168/1668792 [==============================] - 0s 0us/step
1679360/1668792 [==============================] - 0s 0us/step
pf_df.head()

Para o propósito do tutorial, modificações são feitas na coluna do rótulo. 0 indicará que o animal de estimação não foi adotado e 1 indicará que foi.

# In the original dataset "4" indicates the pet was not adopted.
pf_df['target'] = np.where(pf_df['AdoptionSpeed']==4, 0, 1)

# Drop un-used columns.
pf_df = pf_df.drop(columns=['AdoptionSpeed', 'Description'])
# Number of datapoints and columns
len(pf_df), len(pf_df.columns)
(11537, 14)

Dividir o conjunto de dados

train_df, test_df = train_test_split(pf_df, test_size=0.3, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))
Number of training samples:  8075
Number of testing sample:  3462

Armazene os dados de trem e teste nas coleções do mongo

URI = "mongodb://localhost:27017"
DATABASE = "tfiodb"
TRAIN_COLLECTION = "train"
TEST_COLLECTION = "test"
db = client[DATABASE]
if "train" not in db.list_collection_names():
  db.create_collection(TRAIN_COLLECTION)
if "test" not in db.list_collection_names():
  db.create_collection(TEST_COLLECTION)
def store_records(collection, records):
  writer = tfio.experimental.mongodb.MongoDBWriter(
      uri=URI, database=DATABASE, collection=collection
  )
  for record in records:
      writer.write(record)
store_records(collection="train", records=train_df.to_dict("records"))
time.sleep(2)
store_records(collection="test", records=test_df.to_dict("records"))

Preparar conjuntos de dados tfio

Uma vez que os dados estão disponíveis no aglomerado, o mongodb.MongoDBIODataset classe é utilizado para esta finalidade. Os herda da classe de tf.data.Dataset e, portanto, expõe todas as funcionalidades úteis do tf.data.Dataset fora da caixa.

Conjunto de dados de treinamento

train_ds = tfio.experimental.mongodb.MongoDBIODataset(
        uri=URI, database=DATABASE, collection=TRAIN_COLLECTION
    )

train_ds
Connection successful: mongodb://localhost:27017
WARNING:tensorflow:From /usr/local/lib/python3.7/dist-packages/tensorflow/python/data/experimental/ops/counter.py:66: scan (from tensorflow.python.data.experimental.ops.scan_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.scan(...) instead
WARNING:tensorflow:From /usr/local/lib/python3.7/dist-packages/tensorflow_io/python/experimental/mongodb_dataset_ops.py:114: take_while (from tensorflow.python.data.experimental.ops.take_while_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.take_while(...)
<MongoDBIODataset shapes: (), types: tf.string>

Cada item train_ds é uma string que precisa ser decodificado em um JSON. Para fazer isso, você pode selecionar apenas um subconjunto das colunas especificando o TensorSpec

# Numeric features.
numerical_cols = ['PhotoAmt', 'Fee'] 

SPECS = {
    "target": tf.TensorSpec(tf.TensorShape([]), tf.int64, name="target"),
}
for col in numerical_cols:
  SPECS[col] = tf.TensorSpec(tf.TensorShape([]), tf.int32, name=col)
pprint(SPECS)
{'Fee': TensorSpec(shape=(), dtype=tf.int32, name='Fee'),
 'PhotoAmt': TensorSpec(shape=(), dtype=tf.int32, name='PhotoAmt'),
 'target': TensorSpec(shape=(), dtype=tf.int64, name='target')}
BATCH_SIZE=32
train_ds = train_ds.map(
        lambda x: tfio.experimental.serialization.decode_json(x, specs=SPECS)
    )

# Prepare a tuple of (features, label)
train_ds = train_ds.map(lambda v: (v, v.pop("target")))
train_ds = train_ds.batch(BATCH_SIZE)

train_ds
<BatchDataset shapes: ({PhotoAmt: (None,), Fee: (None,)}, (None,)), types: ({PhotoAmt: tf.int32, Fee: tf.int32}, tf.int64)>

Testando conjunto de dados

test_ds = tfio.experimental.mongodb.MongoDBIODataset(
        uri=URI, database=DATABASE, collection=TEST_COLLECTION
    )
test_ds = test_ds.map(
        lambda x: tfio.experimental.serialization.decode_json(x, specs=SPECS)
    )
# Prepare a tuple of (features, label)
test_ds = test_ds.map(lambda v: (v, v.pop("target")))
test_ds = test_ds.batch(BATCH_SIZE)

test_ds
Connection successful: mongodb://localhost:27017
<BatchDataset shapes: ({PhotoAmt: (None,), Fee: (None,)}, (None,)), types: ({PhotoAmt: tf.int32, Fee: tf.int32}, tf.int64)>

Definir as camadas de pré-processamento do keras

De acordo com o tutorial de dados estruturados , recomenda-se usar as Camadas Keras pré-processamento como eles são mais intuitivo, e pode ser facilmente integrado com os modelos. No entanto, o padrão feature_columns também pode ser usado.

Para uma melhor compreensão dos preprocessing_layers na classificação dados estruturados, consulte o tutorial de dados estruturados

def get_normalization_layer(name, dataset):
  # Create a Normalization layer for our feature.
  normalizer = preprocessing.Normalization(axis=None)

  # Prepare a Dataset that only yields our feature.
  feature_ds = dataset.map(lambda x, y: x[name])

  # Learn the statistics of the data.
  normalizer.adapt(feature_ds)

  return normalizer
all_inputs = []
encoded_features = []

for header in numerical_cols:
  numeric_col = tf.keras.Input(shape=(1,), name=header)
  normalization_layer = get_normalization_layer(header, train_ds)
  encoded_numeric_col = normalization_layer(numeric_col)
  all_inputs.append(numeric_col)
  encoded_features.append(encoded_numeric_col)

Construir, compilar e treinar o modelo

# Set the parameters

OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10
# Convert the feature columns into a tf.keras layer
all_features = tf.keras.layers.concatenate(encoded_features)

# design/build the model
x = tf.keras.layers.Dense(32, activation="relu")(all_features)
x = tf.keras.layers.Dropout(0.5)(x)
x = tf.keras.layers.Dense(64, activation="relu")(x)
x = tf.keras.layers.Dropout(0.5)(x)
output = tf.keras.layers.Dense(1)(x)
model = tf.keras.Model(all_inputs, output)
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model
model.fit(train_ds, epochs=EPOCHS)
Epoch 1/10
109/109 [==============================] - 1s 2ms/step - loss: 0.6261 - accuracy: 0.4711
Epoch 2/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5939 - accuracy: 0.6967
Epoch 3/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5900 - accuracy: 0.6993
Epoch 4/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5846 - accuracy: 0.7146
Epoch 5/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5824 - accuracy: 0.7178
Epoch 6/10
109/109 [==============================] - 0s 2ms/step - loss: 0.5778 - accuracy: 0.7233
Epoch 7/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5810 - accuracy: 0.7083
Epoch 8/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5791 - accuracy: 0.7149
Epoch 9/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5742 - accuracy: 0.7207
Epoch 10/10
109/109 [==============================] - 0s 2ms/step - loss: 0.5797 - accuracy: 0.7083
<keras.callbacks.History at 0x7f743229fe90>

Inferir sobre os dados de teste

res = model.evaluate(test_ds)
print("test loss, test acc:", res)
109/109 [==============================] - 0s 2ms/step - loss: 0.5696 - accuracy: 0.7383
test loss, test acc: [0.569588840007782, 0.7383015751838684]

Referências: