Análisis de componentes principales probabilístico (PCA) es una técnica de reducción de la dimensionalidad que analiza los datos a través de un espacio latente inferior dimensional ( Tipping y Bishop 1999 ). Se utiliza a menudo cuando faltan valores en los datos o para escalado multidimensional.
Importaciones
import functools
import warnings
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import tensorflow.compat.v2 as tf
import tensorflow_probability as tfp
from tensorflow_probability import bijectors as tfb
from tensorflow_probability import distributions as tfd
tf.enable_v2_behavior()
plt.style.use("ggplot")
warnings.filterwarnings('ignore')
El modelo
Considere un conjunto de datos X={xn} de N puntos de datos, donde cada punto de datos es Ddimensional, vecxn in mathbbRD\(.Weaimtorepresenteach\) vecxn bajo una variable latente zn∈RK con dimensión inferior, K<D\(.Thesetofprincipalaxes\) mathbfW relaciona las variables latentes a los datos.
Específicamente, asumimos que cada variable latente se distribuye normalmente,
zn∼N(0,I).
El punto de datos correspondiente se genera a través de una proyección,
xn∣zn∼N(Wzn,σ2I),
donde la matriz W∈RD×K se conoce como los ejes principales. En PCA probabilística, estamos interesados normalmente en la estimación de los ejes principales W y el término de ruidoσ2.
El PCA probabilístico generaliza el PCA clásico. Al marginar la variable latente, la distribución de cada punto de datos es
xn∼N(0,WW⊤+σ2I).
PCA clásico es el caso específico de PCA probabilístico cuando la covarianza del ruido se hace, infinitesimalmente pequeña σ2→0.
Configuramos nuestro modelo a continuación. En nuestro análisis, suponemos σ es conocido, y en lugar de punto estimar W como un parámetro de modelo, colocamos un previo sobre él con el fin de inferir una distribución más ejes principales. Vamos a expresar el modelo como la PTF JointDistribution, en concreto, vamos a utilizar JointDistributionCoroutineAutoBatched .
def probabilistic_pca(data_dim, latent_dim, num_datapoints, stddv_datapoints):
w = yield tfd.Normal(loc=tf.zeros([data_dim, latent_dim]),
scale=2.0 * tf.ones([data_dim, latent_dim]),
name="w")
z = yield tfd.Normal(loc=tf.zeros([latent_dim, num_datapoints]),
scale=tf.ones([latent_dim, num_datapoints]),
name="z")
x = yield tfd.Normal(loc=tf.matmul(w, z),
scale=stddv_datapoints,
name="x")
num_datapoints = 5000
data_dim = 2
latent_dim = 1
stddv_datapoints = 0.5
concrete_ppca_model = functools.partial(probabilistic_pca,
data_dim=data_dim,
latent_dim=latent_dim,
num_datapoints=num_datapoints,
stddv_datapoints=stddv_datapoints)
model = tfd.JointDistributionCoroutineAutoBatched(concrete_ppca_model)
Los datos
Podemos utilizar el modelo para generar datos mediante el muestreo de la distribución previa conjunta.
actual_w, actual_z, x_train = model.sample()
print("Principal axes:")
print(actual_w)
Principal axes: tf.Tensor( [[ 2.2801023] [-1.1619819]], shape=(2, 1), dtype=float32)
Visualizamos el conjunto de datos.
plt.scatter(x_train[0, :], x_train[1, :], color='blue', alpha=0.1)
plt.axis([-20, 20, -20, 20])
plt.title("Data set")
plt.show()
Inferencia máxima a posteriori
Primero buscamos la estimación puntual de las variables latentes que maximiza la densidad de probabilidad posterior. Esto se conoce como máximo una inferencia (MAP) posteriori, y se realiza mediante el cálculo de los valores de W y Z que maximizan la densidad posterior p(W,Z∣X)∝p(W,Z,X).
w = tf.Variable(tf.random.normal([data_dim, latent_dim]))
z = tf.Variable(tf.random.normal([latent_dim, num_datapoints]))
target_log_prob_fn = lambda w, z: model.log_prob((w, z, x_train))
losses = tfp.math.minimize(
lambda: -target_log_prob_fn(w, z),
optimizer=tf.optimizers.Adam(learning_rate=0.05),
num_steps=200)
plt.plot(losses)
[<matplotlib.lines.Line2D at 0x7f19897a42e8>]
Podemos utilizar el modelo de datos de ejemplo para los valores inferidos para W y Z, y compare con el conjunto de datos real que condicionado a.
print("MAP-estimated axes:")
print(w)
_, _, x_generated = model.sample(value=(w, z, None))
plt.scatter(x_train[0, :], x_train[1, :], color='blue', alpha=0.1, label='Actual data')
plt.scatter(x_generated[0, :], x_generated[1, :], color='red', alpha=0.1, label='Simulated data (MAP)')
plt.legend()
plt.axis([-20, 20, -20, 20])
plt.show()
MAP-estimated axes: <tf.Variable 'Variable:0' shape=(2, 1) dtype=float32, numpy= array([[ 2.9135954], [-1.4826864]], dtype=float32)>
Inferencia variacional
MAP se puede utilizar para encontrar el modo (o uno de los modos) de la distribución posterior, pero no proporciona ninguna otra información al respecto. A continuación utilizamos la inferencia variacional, donde el Distribtion posterior p(W,Z∣X) se aproxima mediante una distribución variacional q(W,Z) parametrizado por λ. El objetivo es encontrar los parámetros variacionales λ que reducen al mínimo la divergencia KL entre Q y la parte posterior, KL(q(W,Z)∣∣p(W,Z∣X)), o equivalentemente, que maximizan la evidencia límite inferior, Eq(W,Z;λ)[logp(W,Z,X)−logq(W,Z;λ)].
qw_mean = tf.Variable(tf.random.normal([data_dim, latent_dim]))
qz_mean = tf.Variable(tf.random.normal([latent_dim, num_datapoints]))
qw_stddv = tfp.util.TransformedVariable(1e-4 * tf.ones([data_dim, latent_dim]),
bijector=tfb.Softplus())
qz_stddv = tfp.util.TransformedVariable(
1e-4 * tf.ones([latent_dim, num_datapoints]),
bijector=tfb.Softplus())
def factored_normal_variational_model():
qw = yield tfd.Normal(loc=qw_mean, scale=qw_stddv, name="qw")
qz = yield tfd.Normal(loc=qz_mean, scale=qz_stddv, name="qz")
surrogate_posterior = tfd.JointDistributionCoroutineAutoBatched(
factored_normal_variational_model)
losses = tfp.vi.fit_surrogate_posterior(
target_log_prob_fn,
surrogate_posterior=surrogate_posterior,
optimizer=tf.optimizers.Adam(learning_rate=0.05),
num_steps=200)
print("Inferred axes:")
print(qw_mean)
print("Standard Deviation:")
print(qw_stddv)
plt.plot(losses)
plt.show()
Inferred axes: <tf.Variable 'Variable:0' shape=(2, 1) dtype=float32, numpy= array([[ 2.4168603], [-1.2236133]], dtype=float32)> Standard Deviation: <TransformedVariable: dtype=float32, shape=[2, 1], fn="softplus", numpy= array([[0.0042499 ], [0.00598824]], dtype=float32)>
posterior_samples = surrogate_posterior.sample(50)
_, _, x_generated = model.sample(value=(posterior_samples))
# It's a pain to plot all 5000 points for each of our 50 posterior samples, so
# let's subsample to get the gist of the distribution.
x_generated = tf.reshape(tf.transpose(x_generated, [1, 0, 2]), (2, -1))[:, ::47]
plt.scatter(x_train[0, :], x_train[1, :], color='blue', alpha=0.1, label='Actual data')
plt.scatter(x_generated[0, :], x_generated[1, :], color='red', alpha=0.1, label='Simulated data (VI)')
plt.legend()
plt.axis([-20, 20, -20, 20])
plt.show()
Agradecimientos
Este tutorial fue escrito originalmente en Edward 1,0 ( fuente ). Agradecemos a todos los colaboradores por escribir y revisar esa versión.
Referencias
[1]: Michael E. Tipping y Christopher M. Bishop. Análisis probabilístico de componentes principales. Revista de la Sociedad Royal Statistical: Serie B (Metodología Estadística), 61 (3): 611-622, 1999.