Coverage for pybmc/inference_utils.py: 100%

79 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-27 15:48 +0000

1import numpy as np 

2 

3 

4def gibbs_sampler(y, X, iterations, prior_info): 

5 """ 

6 Performs Gibbs sampling for Bayesian linear regression. 

7 

8 Args: 

9 y (numpy.ndarray): Response vector (centered). 

10 X (numpy.ndarray): Design matrix. 

11 iterations (int): Number of sampling iterations. 

12 prior_info (tuple[numpy.ndarray, numpy.ndarray, float, float]): Prior parameters: 

13 - `b_mean_prior` (numpy.ndarray): Prior mean for coefficients. 

14 - `b_mean_cov` (numpy.ndarray): Prior covariance matrix. 

15 - `nu0` (float): Prior degrees of freedom for variance. 

16 - `sigma20` (float): Prior variance. 

17 

18 Returns: 

19 numpy.ndarray: Posterior samples `[beta, sigma]`. 

20 """ 

21 b_mean_prior, b_mean_cov, nu0, sigma20 = prior_info 

22 b_mean_cov_inv = np.linalg.inv(b_mean_cov) 

23 n = len(y) 

24 

25 X_T_X = X.T.dot(X) 

26 X_T_X_inv = np.linalg.inv(X_T_X) 

27 

28 b_data = X_T_X_inv.dot(X.T).dot(y) 

29 supermodel = X.dot(b_data) 

30 residuals = y - supermodel 

31 sigma2 = np.sum(residuals**2) / len(residuals) 

32 cov_matrix = sigma2 * X_T_X_inv 

33 

34 samples = [] 

35 

36 # Initialize sigma2 with a small positive value to avoid division by zero 

37 sigma2 = max(sigma2, 1e-6) 

38 

39 for i in range(iterations): 

40 # Regularize the covariance matrix to ensure it is positive definite 

41 cov_matrix = np.linalg.inv(X_T_X / sigma2 + b_mean_cov_inv + np.eye(X_T_X.shape[0]) * 1e-6) 

42 mean_vector = cov_matrix.dot( 

43 b_mean_cov_inv.dot(b_mean_prior) + X.T.dot(y) / sigma2 

44 ) 

45 b_current = np.random.multivariate_normal(mean_vector, cov_matrix) 

46 

47 # Sample from the conditional posterior of sigma2 given bs and data 

48 supermodel = X.dot(b_current) 

49 residuals = y - supermodel 

50 shape_post = (nu0 + n) / 2.0 

51 scale_post = (nu0 * sigma20 + np.sum(residuals**2)) / 2.0 

52 sigma2 = max(1 / np.random.default_rng().gamma(shape_post, 1 / scale_post), 1e-6) 

53 

54 samples.append(np.append(b_current, np.sqrt(sigma2))) 

55 

56 return np.array(samples) 

57 

58 

59def gibbs_sampler_simplex( 

60 y, X, Vt_hat, S_hat, iterations, prior_info, burn=10000, stepsize=0.001 

61): 

62 """ 

63 Performs Gibbs sampling with simplex constraints on model weights. 

64 

65 Args: 

66 y (numpy.ndarray): Centered response vector. 

67 X (numpy.ndarray): Design matrix of principal components. 

68 Vt_hat (numpy.ndarray): Normalized right singular vectors. 

69 S_hat (numpy.ndarray): Singular values. 

70 iterations (int): Number of sampling iterations. 

71 prior_info (list[float]): `[nu0, sigma20]` - prior parameters for variance. 

72 burn (int, optional): Burn-in iterations (default: 10000). 

73 stepsize (float, optional): Proposal step size (default: 0.001). 

74 

75 Returns: 

76 numpy.ndarray: Posterior samples `[beta, sigma]`. 

77 """ 

78 bias0 = np.full(len(Vt_hat.T), 1 / len(Vt_hat.T)) 

79 nu0, sigma20 = prior_info 

80 cov_matrix_step = np.diag(S_hat**2 * stepsize**2) 

81 n = len(y) 

82 b_current = np.full(len(X.T), 0) 

83 supermodel_current = X.dot(b_current) 

84 residuals_current = y - supermodel_current 

85 log_likelihood_current = -np.sum(residuals_current**2) 

86 sigma2 = -log_likelihood_current / len(residuals_current) 

87 samples = [] 

88 acceptance = 0 

89 

90 # Validate inputs 

91 if burn < 0: 

92 raise ValueError("Burn-in iterations must be non-negative.") 

93 if stepsize <= 0: 

94 raise ValueError("Stepsize must be positive.") 

95 

96 # Burn-in phase 

97 for i in range(burn): 

98 b_proposed = np.random.multivariate_normal(b_current, cov_matrix_step) 

99 omegas_proposed = np.dot(b_proposed, Vt_hat) + bias0 

100 

101 # Skip proposals with negative weights 

102 if not np.any(omegas_proposed < 0): 

103 supermodel_proposed = X.dot(b_proposed) 

104 residuals_proposed = y - supermodel_proposed 

105 log_likelihood_proposed = -np.sum(residuals_proposed**2) 

106 acceptance_prob = min( 

107 1, 

108 np.exp((log_likelihood_proposed - log_likelihood_current) / sigma2), 

109 ) 

110 if np.random.uniform() < acceptance_prob: 

111 b_current = np.copy(b_proposed) 

112 log_likelihood_current = log_likelihood_proposed 

113 

114 # Sample variance 

115 shape_post = (nu0 + n) / 2.0 

116 scale_post = (nu0 * sigma20 - log_likelihood_current) / 2.0 

117 sigma2 = 1 / np.random.default_rng().gamma(shape_post, 1 / scale_post) 

118 

119 # Sampling phase 

120 for i in range(iterations): 

121 b_proposed = np.random.multivariate_normal(b_current, cov_matrix_step) 

122 omegas_proposed = np.dot(b_proposed, Vt_hat) + bias0 

123 

124 if not np.any(omegas_proposed < 0): 

125 supermodel_proposed = X.dot(b_proposed) 

126 residuals_proposed = y - supermodel_proposed 

127 log_likelihood_proposed = -np.sum(residuals_proposed**2) 

128 acceptance_prob = min( 

129 1, 

130 np.exp((log_likelihood_proposed - log_likelihood_current) / sigma2), 

131 ) 

132 if np.random.uniform() < acceptance_prob: 

133 b_current = np.copy(b_proposed) 

134 log_likelihood_current = log_likelihood_proposed 

135 acceptance += 1 

136 

137 # Sample variance 

138 shape_post = (nu0 + n) / 2.0 

139 scale_post = (nu0 * sigma20 - log_likelihood_current) / 2.0 

140 sigma2 = 1 / np.random.default_rng().gamma(shape_post, 1 / scale_post) 

141 samples.append(np.append(b_current, np.sqrt(sigma2))) 

142 

143 print(f"Acceptance rate: {acceptance/iterations*100:.2f}%") 

144 return np.array(samples) 

145 

146 

147def USVt_hat_extraction(U, S, Vt, components_kept): 

148 """ 

149 Extracts reduced-dimensionality matrices from SVD results. 

150 

151 Args: 

152 U (numpy.ndarray): Left singular vectors. 

153 S (numpy.ndarray): Singular values. 

154 Vt (numpy.ndarray): Right singular vectors (transposed). 

155 components_kept (int): Number of components to retain. 

156 

157 Returns: 

158 tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]: 

159 - `U_hat` (numpy.ndarray): Reduced left singular vectors. 

160 - `S_hat` (numpy.ndarray): Retained singular values. 

161 - `Vt_hat` (numpy.ndarray): Normalized right singular vectors. 

162 - `Vt_hat_normalized` (numpy.ndarray): Original right singular vectors. 

163 """ 

164 U_hat = np.array([U.T[i] for i in range(components_kept)]).T 

165 S_hat = S[:components_kept] 

166 Vt_hat = np.array([Vt[i] / S[i] for i in range(components_kept)]) 

167 Vt_hat_normalized = np.array([Vt[i] for i in range(components_kept)]) 

168 return U_hat, S_hat, Vt_hat, Vt_hat_normalized