Coverage for pybmc/bmc.py: 84%
83 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-10-14 21:12 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-10-14 21:12 +0000
1import numpy as np
2import pandas as pd
3import matplotlib.pyplot as plt
4from sklearn.model_selection import train_test_split
5import os
6from .inference_utils import gibbs_sampler, USVt_hat_extraction
7from .sampling_utils import coverage, rndm_m_random_calculator
10class BayesianModelCombination:
11 """
12 The main idea of this class is to perform BMM on the set of models that we choose
13 from the dataset class. What should this class contain:
14 + Orthogonalization step.
15 + Perform Bayesian inference on the training data that we extract from the Dataset class.
16 + Predictions for certain isotopes.
17 """
19 def __init__(self, models_list, data_dict, truth_column_name, weights=None):
20 """
21 Initialize the BayesianModelCombination class.
23 :param models_list: List of model names
24 :param data_dict: Dictionary from `load_data()` where each key is a model name and each value is a DataFrame of properties
25 :param truth_column_name: Name of the column containing the truth values.
26 :param weights: Optional initial weights for the models.
27 """
29 if not isinstance(models_list, list) or not all(isinstance(model, str) for model in models_list):
30 raise ValueError("The 'models' should be a list of model names (strings) for Bayesian Combination.")
31 if not isinstance(data_dict, dict) or not all(isinstance(df, pd.DataFrame) for df in data_dict.values()):
32 raise ValueError("The 'data_dict' should be a dictionary of pandas DataFrames, one per property.")
34 self.data_dict = data_dict
35 self.models_list = models_list
36 self.models = [m for m in models_list if m != 'truth']
37 self.weights = weights if weights is not None else None
38 self.truth_column_name = truth_column_name
41 def orthogonalize(self, property, train_df, components_kept):
42 """
43 Perform orthogonalization for the specified property using training data.
45 :param property: The nuclear property to orthogonalize on (e.g., 'BE').
46 :param train_index: Training data from split_data
47 :param components_kept: Number of SVD components to retain.
48 """
49 # Store selected property
50 self.current_property = property
52 # Extract the relevant DataFrame for that property
53 df = self.data_dict[property].copy()
54 self.selected_models_dataset = df # Store for train() and predict()
56 # Extract model outputs (only the model columns)
57 models_output_train = train_df[self.models]
58 model_predictions_train = models_output_train.values
60 # Mean prediction across models (per nucleus)
61 predictions_mean_train = np.mean(model_predictions_train, axis=1)
63 # Experimental truth values for the property
64 centered_experiment_train = train_df[self.truth_column_name].values - predictions_mean_train
66 # Center model predictions
67 model_predictions_train_centered = model_predictions_train - predictions_mean_train[:, None]
69 # Perform SVD
70 U, S, Vt = np.linalg.svd(model_predictions_train_centered)
72 # Dimensionality reduction
73 U_hat, S_hat, Vt_hat, Vt_hat_normalized = USVt_hat_extraction(U, S, Vt, components_kept) #type: ignore
75 # Save for training
76 self.centered_experiment_train = centered_experiment_train
77 self.U_hat = U_hat
78 self.Vt_hat = Vt_hat
79 self.S_hat = S_hat
80 self.Vt_hat_normalized = Vt_hat_normalized
81 self._predictions_mean_train = predictions_mean_train
84 def train(self, training_options=None):
85 """
86 Train the model combination using training data and optional training parameters.
88 :param training_data: Placeholder (not used).
89 :param training_options: Dictionary of training options. Keys:
90 - 'iterations': (int) Number of Gibbs iterations (default 50000)
91 - 'b_mean_prior': (np.ndarray) Prior mean vector (default zeros)
92 - 'b_mean_cov': (np.ndarray) Prior covariance matrix (default diag(S_hat²))
93 - 'nu0_chosen': (float) Degrees of freedom for variance prior (default 1.0)
94 - 'sigma20_chosen': (float) Prior variance (default 0.02)
95 """
96 if training_options is None:
97 training_options = {}
99 iterations = training_options.get('iterations', 50000)
100 num_components = self.U_hat.shape[1]
101 S_hat = self.S_hat
103 b_mean_prior = training_options.get('b_mean_prior', np.zeros(num_components))
104 b_mean_cov = training_options.get('b_mean_cov', np.diag(S_hat**2))
105 nu0_chosen = training_options.get('nu0_chosen', 1.0)
106 sigma20_chosen = training_options.get('sigma20_chosen', 0.02)
108 self.samples = gibbs_sampler(self.centered_experiment_train, self.U_hat, iterations, [b_mean_prior, b_mean_cov, nu0_chosen, sigma20_chosen])
112 def predict(self, property):
113 """
114 Predict a specified property using the model weights learned during training.
116 :param property: The property name to predict (e.g., 'ChRad').
117 :return:
118 - rndm_m: array of shape (n_samples, n_points), full posterior draws
119 - lower_df: DataFrame with columns domain_keys + ['Predicted_Lower']
120 - median_df: DataFrame with columns domain_keys + ['Predicted_Median']
121 - upper_df: DataFrame with columns domain_keys + ['Predicted_Upper']
122 """
123 if self.samples is None or self.Vt_hat is None:
124 raise ValueError("Must call `orthogonalize()` and `train()` before predicting.")
126 if property not in self.data_dict:
127 raise KeyError(f"Property '{property}' not found in data_dict.")
129 df = self.data_dict[property].copy()
131 # Infer domain and model columns
132 full_model_cols = self.models
133 domain_keys = [col for col in df.columns if col not in full_model_cols and col != self.truth_column_name]
135 # Determine which models are present
136 available_models = [m for m in full_model_cols if m in df.columns]
138 if len(available_models) == 0:
139 raise ValueError("No available trained models are present in prediction DataFrame.")
141 # Filter predictions and model weights
142 model_preds = df[available_models].values
143 domain_df = df[domain_keys].reset_index(drop=True)
145 rndm_m, (lower, median, upper) = rndm_m_random_calculator(model_preds, self.samples, self.Vt_hat)
147 # Build output DataFrames
148 lower_df = domain_df.copy()
150 lower_df["Predicted_Lower"] = lower
152 median_df = domain_df.copy()
153 median_df["Predicted_Median"] = median
155 upper_df = domain_df.copy()
156 upper_df["Predicted_Upper"] = upper
158 return rndm_m, lower_df, median_df, upper_df
160 def evaluate(self, domain_filter=None):
161 """
162 Evaluate the model combination using coverage calculation.
164 :param domain_filter: dict with optional domain key ranges, e.g., {"Z": (20, 30), "N": (20, 40)}
165 :return: coverage list for each percentile
166 """
167 df = self.data_dict[self.current_property]
169 if domain_filter:
170 # Inline optimized filtering
171 for col, cond in domain_filter.items():
172 if col == 'multi' and callable(cond):
173 df = df[df.apply(cond, axis=1)]
174 elif callable(cond):
175 df = df[cond(df[col])]
176 elif isinstance(cond, tuple) and len(cond) == 2:
177 df = df[df[col].between(*cond)]
178 elif isinstance(cond, list):
179 df = df[df[col].isin(cond)]
180 else:
181 df = df[df[col] == cond]
183 preds = df[self.models].to_numpy()
184 rndm_m, (lower, median, upper) = rndm_m_random_calculator(preds, self.samples, self.Vt_hat)
186 return coverage(np.arange(0, 101, 5), rndm_m, df, truth_column=self.truth_column_name)