Coverage for pybmc/bmc.py: 84%

83 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-10-14 21:12 +0000

1import numpy as np 

2import pandas as pd 

3import matplotlib.pyplot as plt 

4from sklearn.model_selection import train_test_split 

5import os 

6from .inference_utils import gibbs_sampler, USVt_hat_extraction 

7from .sampling_utils import coverage, rndm_m_random_calculator 

8 

9 

10class BayesianModelCombination: 

11 """ 

12 The main idea of this class is to perform BMM on the set of models that we choose  

13 from the dataset class. What should this class contain: 

14 + Orthogonalization step. 

15 + Perform Bayesian inference on the training data that we extract from the Dataset class. 

16 + Predictions for certain isotopes. 

17 """ 

18 

19 def __init__(self, models_list, data_dict, truth_column_name, weights=None): 

20 """  

21 Initialize the BayesianModelCombination class. 

22 

23 :param models_list: List of model names 

24 :param data_dict: Dictionary from `load_data()` where each key is a model name and each value is a DataFrame of properties 

25 :param truth_column_name: Name of the column containing the truth values. 

26 :param weights: Optional initial weights for the models. 

27 """ 

28 

29 if not isinstance(models_list, list) or not all(isinstance(model, str) for model in models_list): 

30 raise ValueError("The 'models' should be a list of model names (strings) for Bayesian Combination.") 

31 if not isinstance(data_dict, dict) or not all(isinstance(df, pd.DataFrame) for df in data_dict.values()): 

32 raise ValueError("The 'data_dict' should be a dictionary of pandas DataFrames, one per property.") 

33 

34 self.data_dict = data_dict 

35 self.models_list = models_list 

36 self.models = [m for m in models_list if m != 'truth'] 

37 self.weights = weights if weights is not None else None 

38 self.truth_column_name = truth_column_name 

39 

40 

41 def orthogonalize(self, property, train_df, components_kept): 

42 """ 

43 Perform orthogonalization for the specified property using training data. 

44 

45 :param property: The nuclear property to orthogonalize on (e.g., 'BE'). 

46 :param train_index: Training data from split_data 

47 :param components_kept: Number of SVD components to retain. 

48 """ 

49 # Store selected property 

50 self.current_property = property 

51 

52 # Extract the relevant DataFrame for that property 

53 df = self.data_dict[property].copy() 

54 self.selected_models_dataset = df # Store for train() and predict() 

55 

56 # Extract model outputs (only the model columns) 

57 models_output_train = train_df[self.models] 

58 model_predictions_train = models_output_train.values 

59 

60 # Mean prediction across models (per nucleus) 

61 predictions_mean_train = np.mean(model_predictions_train, axis=1) 

62 

63 # Experimental truth values for the property 

64 centered_experiment_train = train_df[self.truth_column_name].values - predictions_mean_train 

65 

66 # Center model predictions 

67 model_predictions_train_centered = model_predictions_train - predictions_mean_train[:, None] 

68 

69 # Perform SVD 

70 U, S, Vt = np.linalg.svd(model_predictions_train_centered) 

71 

72 # Dimensionality reduction 

73 U_hat, S_hat, Vt_hat, Vt_hat_normalized = USVt_hat_extraction(U, S, Vt, components_kept) #type: ignore 

74 

75 # Save for training 

76 self.centered_experiment_train = centered_experiment_train 

77 self.U_hat = U_hat 

78 self.Vt_hat = Vt_hat 

79 self.S_hat = S_hat 

80 self.Vt_hat_normalized = Vt_hat_normalized 

81 self._predictions_mean_train = predictions_mean_train 

82 

83 

84 def train(self, training_options=None): 

85 """ 

86 Train the model combination using training data and optional training parameters. 

87 

88 :param training_data: Placeholder (not used). 

89 :param training_options: Dictionary of training options. Keys: 

90 - 'iterations': (int) Number of Gibbs iterations (default 50000) 

91 - 'b_mean_prior': (np.ndarray) Prior mean vector (default zeros) 

92 - 'b_mean_cov': (np.ndarray) Prior covariance matrix (default diag(S_hat²)) 

93 - 'nu0_chosen': (float) Degrees of freedom for variance prior (default 1.0) 

94 - 'sigma20_chosen': (float) Prior variance (default 0.02) 

95 """ 

96 if training_options is None: 

97 training_options = {} 

98 

99 iterations = training_options.get('iterations', 50000) 

100 num_components = self.U_hat.shape[1] 

101 S_hat = self.S_hat 

102 

103 b_mean_prior = training_options.get('b_mean_prior', np.zeros(num_components)) 

104 b_mean_cov = training_options.get('b_mean_cov', np.diag(S_hat**2)) 

105 nu0_chosen = training_options.get('nu0_chosen', 1.0) 

106 sigma20_chosen = training_options.get('sigma20_chosen', 0.02) 

107 

108 self.samples = gibbs_sampler(self.centered_experiment_train, self.U_hat, iterations, [b_mean_prior, b_mean_cov, nu0_chosen, sigma20_chosen]) 

109 

110 

111 

112 def predict(self, property): 

113 """ 

114 Predict a specified property using the model weights learned during training. 

115 

116 :param property: The property name to predict (e.g., 'ChRad'). 

117 :return: 

118 - rndm_m: array of shape (n_samples, n_points), full posterior draws 

119 - lower_df: DataFrame with columns domain_keys + ['Predicted_Lower'] 

120 - median_df: DataFrame with columns domain_keys + ['Predicted_Median'] 

121 - upper_df: DataFrame with columns domain_keys + ['Predicted_Upper'] 

122 """ 

123 if self.samples is None or self.Vt_hat is None: 

124 raise ValueError("Must call `orthogonalize()` and `train()` before predicting.") 

125 

126 if property not in self.data_dict: 

127 raise KeyError(f"Property '{property}' not found in data_dict.") 

128 

129 df = self.data_dict[property].copy() 

130 

131 # Infer domain and model columns 

132 full_model_cols = self.models 

133 domain_keys = [col for col in df.columns if col not in full_model_cols and col != self.truth_column_name] 

134 

135 # Determine which models are present 

136 available_models = [m for m in full_model_cols if m in df.columns] 

137 

138 if len(available_models) == 0: 

139 raise ValueError("No available trained models are present in prediction DataFrame.") 

140 

141 # Filter predictions and model weights 

142 model_preds = df[available_models].values 

143 domain_df = df[domain_keys].reset_index(drop=True) 

144 

145 rndm_m, (lower, median, upper) = rndm_m_random_calculator(model_preds, self.samples, self.Vt_hat) 

146 

147 # Build output DataFrames 

148 lower_df = domain_df.copy() 

149 

150 lower_df["Predicted_Lower"] = lower 

151 

152 median_df = domain_df.copy() 

153 median_df["Predicted_Median"] = median 

154 

155 upper_df = domain_df.copy() 

156 upper_df["Predicted_Upper"] = upper 

157 

158 return rndm_m, lower_df, median_df, upper_df 

159 

160 def evaluate(self, domain_filter=None): 

161 """ 

162 Evaluate the model combination using coverage calculation. 

163 

164 :param domain_filter: dict with optional domain key ranges, e.g., {"Z": (20, 30), "N": (20, 40)} 

165 :return: coverage list for each percentile 

166 """ 

167 df = self.data_dict[self.current_property] 

168 

169 if domain_filter: 

170 # Inline optimized filtering 

171 for col, cond in domain_filter.items(): 

172 if col == 'multi' and callable(cond): 

173 df = df[df.apply(cond, axis=1)] 

174 elif callable(cond): 

175 df = df[cond(df[col])] 

176 elif isinstance(cond, tuple) and len(cond) == 2: 

177 df = df[df[col].between(*cond)] 

178 elif isinstance(cond, list): 

179 df = df[df[col].isin(cond)] 

180 else: 

181 df = df[df[col] == cond] 

182 

183 preds = df[self.models].to_numpy() 

184 rndm_m, (lower, median, upper) = rndm_m_random_calculator(preds, self.samples, self.Vt_hat) 

185 

186 return coverage(np.arange(0, 101, 5), rndm_m, df, truth_column=self.truth_column_name) 

187 

188 

189 

190 

191 

192