Veja no TensorFlow.org | Executar no Google Colab | Ver fonte no GitHub | Baixar caderno |
Visão geral
Isto mostra tutorial como usar leitor BigQuery TensorFlow para treinamento da rede neural usando a API sequencial Keras.
Conjunto de dados
Este tutorial usa o United States Census Renda Dataset fornecido pelo Repositório Aprendizado de Máquina UC Irvine . Esse conjunto de dados contém informações sobre pessoas de um banco de dados do Censo de 1994, incluindo idade, educação, estado civil, ocupação e se ganham mais de US$ 50.000 por ano.
Configurar
Configurar seu projeto do GCP
As etapas a seguir são necessárias, independentemente do ambiente do seu notebook.
- Selecione ou crie um projeto do GCP.
- Certifique-se de que o faturamento esteja ativado para seu projeto.
- Ativar a API BigQuery Storage
- Insira o ID do seu projeto na célula abaixo. Em seguida, execute a célula para garantir que o Cloud SDK use o projeto certo para todos os comandos neste notebook.
Instale os pacotes necessários e reinicie o tempo de execução
try:
# Use the Colab's preinstalled TensorFlow 2.x
%tensorflow_version 2.x
except:
pass
pip install fastavro
pip install tensorflow-io==0.9.0
pip install google-cloud-bigquery-storage
Autenticar
from google.colab import auth
auth.authenticate_user()
print('Authenticated')
Defina seu ID do PROJETO
PROJECT_ID = "<YOUR PROJECT>"
! gcloud config set project $PROJECT_ID
%env GCLOUD_PROJECT=$PROJECT_ID
Importar bibliotecas Python, definir constantes
from __future__ import absolute_import, division, print_function, unicode_literals
import os
from six.moves import urllib
import tempfile
import numpy as np
import pandas as pd
import tensorflow as tf
from google.cloud import bigquery
from google.api_core.exceptions import GoogleAPIError
LOCATION = 'us'
# Storage directory
DATA_DIR = os.path.join(tempfile.gettempdir(), 'census_data')
# Download options.
DATA_URL = 'https://storage.googleapis.com/cloud-samples-data/ml-engine/census/data'
TRAINING_FILE = 'adult.data.csv'
EVAL_FILE = 'adult.test.csv'
TRAINING_URL = '%s/%s' % (DATA_URL, TRAINING_FILE)
EVAL_URL = '%s/%s' % (DATA_URL, EVAL_FILE)
DATASET_ID = 'census_dataset'
TRAINING_TABLE_ID = 'census_training_table'
EVAL_TABLE_ID = 'census_eval_table'
CSV_SCHEMA = [
bigquery.SchemaField("age", "FLOAT64"),
bigquery.SchemaField("workclass", "STRING"),
bigquery.SchemaField("fnlwgt", "FLOAT64"),
bigquery.SchemaField("education", "STRING"),
bigquery.SchemaField("education_num", "FLOAT64"),
bigquery.SchemaField("marital_status", "STRING"),
bigquery.SchemaField("occupation", "STRING"),
bigquery.SchemaField("relationship", "STRING"),
bigquery.SchemaField("race", "STRING"),
bigquery.SchemaField("gender", "STRING"),
bigquery.SchemaField("capital_gain", "FLOAT64"),
bigquery.SchemaField("capital_loss", "FLOAT64"),
bigquery.SchemaField("hours_per_week", "FLOAT64"),
bigquery.SchemaField("native_country", "STRING"),
bigquery.SchemaField("income_bracket", "STRING"),
]
UNUSED_COLUMNS = ["fnlwgt", "education_num"]
Importar dados do censo para o BigQuery
Definir métodos auxiliares para carregar dados no BigQuery
def create_bigquery_dataset_if_necessary(dataset_id):
# Construct a full Dataset object to send to the API.
client = bigquery.Client(project=PROJECT_ID)
dataset = bigquery.Dataset(bigquery.dataset.DatasetReference(PROJECT_ID, dataset_id))
dataset.location = LOCATION
try:
dataset = client.create_dataset(dataset) # API request
return True
except GoogleAPIError as err:
if err.code != 409: # http_client.CONFLICT
raise
return False
def load_data_into_bigquery(url, table_id):
create_bigquery_dataset_if_necessary(DATASET_ID)
client = bigquery.Client(project=PROJECT_ID)
dataset_ref = client.dataset(DATASET_ID)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
job_config.source_format = bigquery.SourceFormat.CSV
job_config.schema = CSV_SCHEMA
load_job = client.load_table_from_uri(
url, table_ref, job_config=job_config
)
print("Starting job {}".format(load_job.job_id))
load_job.result() # Waits for table load to complete.
print("Job finished.")
destination_table = client.get_table(table_ref)
print("Loaded {} rows.".format(destination_table.num_rows))
Carregue os dados do Censo no BigQuery.
load_data_into_bigquery(TRAINING_URL, TRAINING_TABLE_ID)
load_data_into_bigquery(EVAL_URL, EVAL_TABLE_ID)
Starting job 2ceffef8-e6e4-44bb-9e86-3d97b0501187 Job finished. Loaded 32561 rows. Starting job bf66f1b3-2506-408b-9009-c19f4ae9f58a Job finished. Loaded 16278 rows.
Confirme se os dados foram importados
TODO: substitua <YOUR PROJECT> pelo seu PROJECT_ID
%%bigquery --use_bqstorage_api
SELECT * FROM `<YOUR PROJECT>.census_dataset.census_training_table` LIMIT 5
Carregar dados do censo no TensorFlow DataSet usando o leitor do BigQuery
Leia e transforme dados do censo do BigQuery no TensorFlow DataSet
from tensorflow.python.framework import ops
from tensorflow.python.framework import dtypes
from tensorflow_io.bigquery import BigQueryClient
from tensorflow_io.bigquery import BigQueryReadSession
def transform_row(row_dict):
# Trim all string tensors
trimmed_dict = { column:
(tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor)
for (column,tensor) in row_dict.items()
}
# Extract feature column
income_bracket = trimmed_dict.pop('income_bracket')
# Convert feature column to 0.0/1.0
income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'),
lambda: tf.constant(1.0),
lambda: tf.constant(0.0))
return (trimmed_dict, income_bracket_float)
def read_bigquery(table_name):
tensorflow_io_bigquery_client = BigQueryClient()
read_session = tensorflow_io_bigquery_client.read_session(
"projects/" + PROJECT_ID,
PROJECT_ID, table_name, DATASET_ID,
list(field.name for field in CSV_SCHEMA
if not field.name in UNUSED_COLUMNS),
list(dtypes.double if field.field_type == 'FLOAT64'
else dtypes.string for field in CSV_SCHEMA
if not field.name in UNUSED_COLUMNS),
requested_streams=2)
dataset = read_session.parallel_read_rows()
transformed_ds = dataset.map(transform_row)
return transformed_ds
BATCH_SIZE = 32
training_ds = read_bigquery(TRAINING_TABLE_ID).shuffle(10000).batch(BATCH_SIZE)
eval_ds = read_bigquery(EVAL_TABLE_ID).batch(BATCH_SIZE)
Definir colunas de recursos
def get_categorical_feature_values(column):
query = 'SELECT DISTINCT TRIM({}) FROM `{}`.{}.{}'.format(column, PROJECT_ID, DATASET_ID, TRAINING_TABLE_ID)
client = bigquery.Client(project=PROJECT_ID)
dataset_ref = client.dataset(DATASET_ID)
job_config = bigquery.QueryJobConfig()
query_job = client.query(query, job_config=job_config)
result = query_job.to_dataframe()
return result.values[:,0]
from tensorflow import feature_column
feature_columns = []
# numeric cols
for header in ['capital_gain', 'capital_loss', 'hours_per_week']:
feature_columns.append(feature_column.numeric_column(header))
# categorical cols
for header in ['workclass', 'marital_status', 'occupation', 'relationship',
'race', 'native_country', 'education']:
categorical_feature = feature_column.categorical_column_with_vocabulary_list(
header, get_categorical_feature_values(header))
categorical_feature_one_hot = feature_column.indicator_column(categorical_feature)
feature_columns.append(categorical_feature_one_hot)
# bucketized cols
age = feature_column.numeric_column('age')
age_buckets = feature_column.bucketized_column(age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65])
feature_columns.append(age_buckets)
feature_layer = tf.keras.layers.DenseFeatures(feature_columns)
Construir e treinar modelo
Construir modelo
Dense = tf.keras.layers.Dense
model = tf.keras.Sequential(
[
feature_layer,
Dense(100, activation=tf.nn.relu, kernel_initializer='uniform'),
Dense(75, activation=tf.nn.relu),
Dense(50, activation=tf.nn.relu),
Dense(25, activation=tf.nn.relu),
Dense(1, activation=tf.nn.sigmoid)
])
# Compile Keras model
model.compile(
loss='binary_crossentropy',
metrics=['accuracy'])
Modelo de trem
model.fit(training_ds, epochs=5)
WARNING:tensorflow:Layer sequential is casting an input tensor from dtype float64 to the layer's dtype of float32, which is new behavior in TensorFlow 2. The layer has dtype float32 because it's dtype defaults to floatx. If you intended to run this layer in float32, you can safely ignore this warning. If in doubt, this warning is likely only an issue if you are porting a TensorFlow 1.X model to TensorFlow 2. To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor. WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/feature_column/feature_column_v2.py:4276: IndicatorColumn._variable_shape (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version. Instructions for updating: The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead. WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/feature_column/feature_column_v2.py:4331: VocabularyListCategoricalColumn._num_buckets (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version. Instructions for updating: The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead. Epoch 1/5 1018/1018 [==============================] - 17s 17ms/step - loss: 0.5985 - accuracy: 0.8105 Epoch 2/5 1018/1018 [==============================] - 10s 10ms/step - loss: 0.3670 - accuracy: 0.8324 Epoch 3/5 1018/1018 [==============================] - 11s 10ms/step - loss: 0.3487 - accuracy: 0.8393 Epoch 4/5 1018/1018 [==============================] - 11s 10ms/step - loss: 0.3398 - accuracy: 0.8435 Epoch 5/5 1018/1018 [==============================] - 11s 11ms/step - loss: 0.3377 - accuracy: 0.8455 <tensorflow.python.keras.callbacks.History at 0x7f978f5b91d0>
Avaliar modelo
Avaliar modelo
loss, accuracy = model.evaluate(eval_ds)
print("Accuracy", accuracy)
509/509 [==============================] - 8s 15ms/step - loss: 0.3338 - accuracy: 0.8398 Accuracy 0.8398452
Avalie algumas amostras aleatórias
sample_x = {
'age' : np.array([56, 36]),
'workclass': np.array(['Local-gov', 'Private']),
'education': np.array(['Bachelors', 'Bachelors']),
'marital_status': np.array(['Married-civ-spouse', 'Married-civ-spouse']),
'occupation': np.array(['Tech-support', 'Other-service']),
'relationship': np.array(['Husband', 'Husband']),
'race': np.array(['White', 'Black']),
'gender': np.array(['Male', 'Male']),
'capital_gain': np.array([0, 7298]),
'capital_loss': np.array([0, 0]),
'hours_per_week': np.array([40, 36]),
'native_country': np.array(['United-States', 'United-States'])
}
model.predict(sample_x)
array([[0.5541261], [0.6209938]], dtype=float32)