Coverage for pybmc/inference_utils.py: 100%
79 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-27 15:48 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-27 15:48 +0000
1import numpy as np
4def gibbs_sampler(y, X, iterations, prior_info):
5 """
6 Performs Gibbs sampling for Bayesian linear regression.
8 Args:
9 y (numpy.ndarray): Response vector (centered).
10 X (numpy.ndarray): Design matrix.
11 iterations (int): Number of sampling iterations.
12 prior_info (tuple[numpy.ndarray, numpy.ndarray, float, float]): Prior parameters:
13 - `b_mean_prior` (numpy.ndarray): Prior mean for coefficients.
14 - `b_mean_cov` (numpy.ndarray): Prior covariance matrix.
15 - `nu0` (float): Prior degrees of freedom for variance.
16 - `sigma20` (float): Prior variance.
18 Returns:
19 numpy.ndarray: Posterior samples `[beta, sigma]`.
20 """
21 b_mean_prior, b_mean_cov, nu0, sigma20 = prior_info
22 b_mean_cov_inv = np.linalg.inv(b_mean_cov)
23 n = len(y)
25 X_T_X = X.T.dot(X)
26 X_T_X_inv = np.linalg.inv(X_T_X)
28 b_data = X_T_X_inv.dot(X.T).dot(y)
29 supermodel = X.dot(b_data)
30 residuals = y - supermodel
31 sigma2 = np.sum(residuals**2) / len(residuals)
32 cov_matrix = sigma2 * X_T_X_inv
34 samples = []
36 # Initialize sigma2 with a small positive value to avoid division by zero
37 sigma2 = max(sigma2, 1e-6)
39 for i in range(iterations):
40 # Regularize the covariance matrix to ensure it is positive definite
41 cov_matrix = np.linalg.inv(X_T_X / sigma2 + b_mean_cov_inv + np.eye(X_T_X.shape[0]) * 1e-6)
42 mean_vector = cov_matrix.dot(
43 b_mean_cov_inv.dot(b_mean_prior) + X.T.dot(y) / sigma2
44 )
45 b_current = np.random.multivariate_normal(mean_vector, cov_matrix)
47 # Sample from the conditional posterior of sigma2 given bs and data
48 supermodel = X.dot(b_current)
49 residuals = y - supermodel
50 shape_post = (nu0 + n) / 2.0
51 scale_post = (nu0 * sigma20 + np.sum(residuals**2)) / 2.0
52 sigma2 = max(1 / np.random.default_rng().gamma(shape_post, 1 / scale_post), 1e-6)
54 samples.append(np.append(b_current, np.sqrt(sigma2)))
56 return np.array(samples)
59def gibbs_sampler_simplex(
60 y, X, Vt_hat, S_hat, iterations, prior_info, burn=10000, stepsize=0.001
61):
62 """
63 Performs Gibbs sampling with simplex constraints on model weights.
65 Args:
66 y (numpy.ndarray): Centered response vector.
67 X (numpy.ndarray): Design matrix of principal components.
68 Vt_hat (numpy.ndarray): Normalized right singular vectors.
69 S_hat (numpy.ndarray): Singular values.
70 iterations (int): Number of sampling iterations.
71 prior_info (list[float]): `[nu0, sigma20]` - prior parameters for variance.
72 burn (int, optional): Burn-in iterations (default: 10000).
73 stepsize (float, optional): Proposal step size (default: 0.001).
75 Returns:
76 numpy.ndarray: Posterior samples `[beta, sigma]`.
77 """
78 bias0 = np.full(len(Vt_hat.T), 1 / len(Vt_hat.T))
79 nu0, sigma20 = prior_info
80 cov_matrix_step = np.diag(S_hat**2 * stepsize**2)
81 n = len(y)
82 b_current = np.full(len(X.T), 0)
83 supermodel_current = X.dot(b_current)
84 residuals_current = y - supermodel_current
85 log_likelihood_current = -np.sum(residuals_current**2)
86 sigma2 = -log_likelihood_current / len(residuals_current)
87 samples = []
88 acceptance = 0
90 # Validate inputs
91 if burn < 0:
92 raise ValueError("Burn-in iterations must be non-negative.")
93 if stepsize <= 0:
94 raise ValueError("Stepsize must be positive.")
96 # Burn-in phase
97 for i in range(burn):
98 b_proposed = np.random.multivariate_normal(b_current, cov_matrix_step)
99 omegas_proposed = np.dot(b_proposed, Vt_hat) + bias0
101 # Skip proposals with negative weights
102 if not np.any(omegas_proposed < 0):
103 supermodel_proposed = X.dot(b_proposed)
104 residuals_proposed = y - supermodel_proposed
105 log_likelihood_proposed = -np.sum(residuals_proposed**2)
106 acceptance_prob = min(
107 1,
108 np.exp((log_likelihood_proposed - log_likelihood_current) / sigma2),
109 )
110 if np.random.uniform() < acceptance_prob:
111 b_current = np.copy(b_proposed)
112 log_likelihood_current = log_likelihood_proposed
114 # Sample variance
115 shape_post = (nu0 + n) / 2.0
116 scale_post = (nu0 * sigma20 - log_likelihood_current) / 2.0
117 sigma2 = 1 / np.random.default_rng().gamma(shape_post, 1 / scale_post)
119 # Sampling phase
120 for i in range(iterations):
121 b_proposed = np.random.multivariate_normal(b_current, cov_matrix_step)
122 omegas_proposed = np.dot(b_proposed, Vt_hat) + bias0
124 if not np.any(omegas_proposed < 0):
125 supermodel_proposed = X.dot(b_proposed)
126 residuals_proposed = y - supermodel_proposed
127 log_likelihood_proposed = -np.sum(residuals_proposed**2)
128 acceptance_prob = min(
129 1,
130 np.exp((log_likelihood_proposed - log_likelihood_current) / sigma2),
131 )
132 if np.random.uniform() < acceptance_prob:
133 b_current = np.copy(b_proposed)
134 log_likelihood_current = log_likelihood_proposed
135 acceptance += 1
137 # Sample variance
138 shape_post = (nu0 + n) / 2.0
139 scale_post = (nu0 * sigma20 - log_likelihood_current) / 2.0
140 sigma2 = 1 / np.random.default_rng().gamma(shape_post, 1 / scale_post)
141 samples.append(np.append(b_current, np.sqrt(sigma2)))
143 print(f"Acceptance rate: {acceptance/iterations*100:.2f}%")
144 return np.array(samples)
147def USVt_hat_extraction(U, S, Vt, components_kept):
148 """
149 Extracts reduced-dimensionality matrices from SVD results.
151 Args:
152 U (numpy.ndarray): Left singular vectors.
153 S (numpy.ndarray): Singular values.
154 Vt (numpy.ndarray): Right singular vectors (transposed).
155 components_kept (int): Number of components to retain.
157 Returns:
158 tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]:
159 - `U_hat` (numpy.ndarray): Reduced left singular vectors.
160 - `S_hat` (numpy.ndarray): Retained singular values.
161 - `Vt_hat` (numpy.ndarray): Normalized right singular vectors.
162 - `Vt_hat_normalized` (numpy.ndarray): Original right singular vectors.
163 """
164 U_hat = np.array([U.T[i] for i in range(components_kept)]).T
165 S_hat = S[:components_kept]
166 Vt_hat = np.array([Vt[i] / S[i] for i in range(components_kept)])
167 Vt_hat_normalized = np.array([Vt[i] for i in range(components_kept)])
168 return U_hat, S_hat, Vt_hat, Vt_hat_normalized