Skip to content

API Reference

This API reference provides detailed documentation for the classes and functions in the pybmc package. It is automatically generated from the docstrings in the source code.

Dataset

A general-purpose dataset class for loading and managing model data for Bayesian model combination workflows.

Supports .h5 and .csv files, and provides data splitting functionality.

Source code in pybmc/data.py
class Dataset:
    """
    A general-purpose dataset class for loading and managing model data
    for Bayesian model combination workflows.

    Supports .h5 and .csv files, and provides data splitting functionality.
    """

    def __init__(self, data_source=None, verbose=True):
        """
        Initialize the Dataset object.

        :param data_source: Path to the data file (.h5 or .csv).
        :param verbose: If True, display warnings and informational messages. Default is True.
        """
        self.data_source = data_source
        self.data = {}  # Dictionary of model to DataFrame
        self.verbose = verbose
        self.logger = logging.getLogger(__name__)
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            handler.setFormatter(logging.Formatter('%(message)s'))
            self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO if verbose else logging.WARNING)

    def load_data(self, models, keys=None, domain_keys=None, model_column='model', truth_column_name=None):
        """
        Load data for each property and return a dictionary of synchronized DataFrames.
        Each DataFrame has columns: domain_keys + one column per model for that property.

        Parameters:
            models (list): List of model names (for HDF5 keys or filtering CSV).
            keys (list): List of property names to extract (each will be a key in the output dict).
            domain_keys (list, optional): List of columns used to define the common domain (default ['N', 'Z']).
            model_column (str, optional): Name of the column in the CSV that identifies which model each row belongs to.
                                          Only used for CSV files; ignored for HDF5 files.
            truth_column_name (str, optional): Name of the truth model. If provided, the truth data will be 
                                               left-joined to the common domain of the other models, allowing 
                                               the truth data to have a smaller domain than the models.

        Returns:
            dict: Dictionary where each key is a property name and each value is a DataFrame with columns:
                  domain_keys + one column per model for that property.
                  The DataFrames are synchronized to the intersection of the domains for all models.
                  If truth_column_name is provided, truth data is left-joined (may have NaN values).

        Supports both .h5 and .csv files.
        """
        self.domain_keys = domain_keys 

        if self.data_source is None:
            raise ValueError("Data source must be specified.")
        if not os.path.exists(self.data_source):
            raise FileNotFoundError(f"Data source '{self.data_source}' not found.")
        if keys is None:
            raise ValueError("You must specify which properties to extract via 'keys'.")

        result = {}

        for prop in keys:
            dfs = []
            truth_df = None
            skipped_models = []

            # Separate regular models from truth model
            regular_models = [m for m in models if m != truth_column_name]

            if self.data_source.endswith('.h5'):
                for model in models:
                    df = pd.read_hdf(self.data_source, key=model)
                    # Check required columns
                    missing_cols = [col for col in domain_keys + [prop] if col not in df.columns]
                    if missing_cols:
                        self.logger.info(f"[Skipped] Model '{model}' missing columns {missing_cols} for property '{prop}'.")
                        skipped_models.append(model)
                        continue
                    temp = df[domain_keys + [prop]].copy()
                    temp.rename(columns={prop: model}, inplace=True) # type: ignore

                    # Store truth data separately if truth_column_name is provided
                    if truth_column_name and model == truth_column_name:
                        truth_df = temp
                    else:
                        dfs.append(temp)

            elif self.data_source.endswith('.csv'):
                df = pd.read_csv(self.data_source)
                for model in models:
                    if model_column not in df.columns:
                        raise ValueError(f"Expected column '{model_column}' not found in CSV.")
                    model_df = df[df[model_column] == model]
                    missing_cols = [col for col in domain_keys + [prop] if col not in model_df.columns]
                    if missing_cols:
                        self.logger.info(f"[Skipped] Model '{model}' missing columns {missing_cols} for key '{prop}'.")
                        skipped_models.append(model)
                        continue
                    temp = model_df[domain_keys + [prop]].copy()
                    temp.rename(columns={prop: model}, inplace=True)

                    # Store truth data separately if truth_column_name is provided
                    if truth_column_name and model == truth_column_name:
                        truth_df = temp
                    else:
                        dfs.append(temp)
            else:
                raise ValueError("Unsupported file format. Only .h5 and .csv are supported.")

            if not dfs:
                self.logger.info(f"[Warning] No models with property '{prop}'. Resulting DataFrame will be empty.")
                result[prop] = pd.DataFrame(columns=domain_keys + [m for m in models if m not in skipped_models])
                continue

            # Intersect domain for regular models only
            common_df = dfs[0]
            for other_df in dfs[1:]:
                common_df = pd.merge(common_df, other_df, on=domain_keys, how="inner")
            # Drop rows with NaN in any required column (for regular models)
            common_df = common_df.dropna()

            # Left join truth data if it exists and was specified
            if truth_df is not None:
                common_df = pd.merge(common_df, truth_df, on=domain_keys, how="left")

            result[prop] = common_df
            self.data = result
        return result

    def view_data(self, property_name=None, model_name=None):
        """
        View data flexibly based on input parameters.

        - No arguments: returns available property names and model names.
        - property_name only: returns the full DataFrame for that property.
        - model_name only: Return model values across all properties.
        - property_name + model_name: returns a Series of values for the model.

        :param property_name: Optional property name 
        :param model_name: Optional model name 
        :return: dict, DataFrame, or Series depending on input.
        """

        if not self.data:
            raise RuntimeError("No data loaded. Run `load_data(...)` first.")

        if property_name is None and model_name is None:
            props = list(self.data.keys())
            models = sorted(set(col for prop_df in self.data.values() for col in prop_df.columns if col not in self.domain_keys))

            return {
                "available_properties": props,
                "available_models": models
            }

        if model_name is not None and property_name is None:
            # Return a dictionary: {property: Series of model values}
            result = {}
            for prop, df in self.data.items():
                if model_name in df.columns:
                    cols = self.domain_keys + [model_name]
                    result[prop] = df[cols]
                else:
                    result[prop] = f"[Model '{model_name}' not available]"
            return result

        if property_name is not None:
            if property_name not in self.data:
                raise KeyError(f"Property '{property_name}' not found.")

            df = self.data[property_name]

            if model_name is None:
                return df  # Full property DataFrame

            if model_name not in df.columns:
                raise KeyError(f"Model '{model_name}' not found in property '{property_name}'.")

            return df[model_name]



    def separate_points_distance_allSets(self, list1, list2, distance1, distance2):
            """
            Separates points in list1 into three groups based on their proximity to any point in list2.

            :param list1: List of (x, y) tuples.
            :param list2: List of (x, y) tuples.
            :param distance: The threshold distance to determine proximity.
            :return: Two lists - close_points and distant_points.
            """
            train = []
            validation=[]
            test = []

            train_list_coordinates=[]
            validation_list_coordinates=[]
            test_list_coordinates=[]

            for i in range(len(list1)):
                point1=list1[i]
                close = False
                for point2 in list2:
                    if np.linalg.norm(np.array(point1) - np.array(point2)) <= distance1:
                        close = True
                        break
                if close:
                    train.append(point1)
                    train_list_coordinates.append(i)
                else:
                    close2=False
                    for point2 in list2:
                        if np.linalg.norm(np.array(point1) - np.array(point2)) <= distance2:
                            close2 = True
                            break
                    if close2:
                        validation.append(point1)
                        validation_list_coordinates.append(i)
                    else:
                        test.append(point1)
                        test_list_coordinates.append(i)                

            return train_list_coordinates, validation_list_coordinates, test_list_coordinates

    def split_data(self, data_dict, property_name, splitting_algorithm="random", **kwargs):
        """
        Split data into training, validation, and testing sets using random or inside-to-outside logic.

        :param data_dict: Dictionary output from `load_data`, where keys are property names and values are DataFrames.
        :param property_name: The key in `data_dict` specifying which DataFrame to use for splitting.
        :param splitting_algorithm: 'random' (default) or 'inside_to_outside'.
        :param kwargs: Additional arguments depending on the chosen algorithm.
            For 'random': train_size, val_size, test_size
            For 'inside_to_outside': stable_points (list of (x, y)), distance1, distance2
        :return: Tuple of train, validation, test datasets as DataFrames.
        """
        if property_name not in data_dict:
            raise ValueError(f"Property '{property_name}' not found in the provided data dictionary.")

        data = data_dict[property_name]

        if isinstance(data, pd.DataFrame):
            indexable_data = data.reset_index(drop=True)
            point_list = list(indexable_data.itertuples(index=False, name=None))
        else:
            raise TypeError("Data for the specified property must be a pandas DataFrame.")

        if splitting_algorithm == "random":
            required = ['train_size', 'val_size', 'test_size']
            if not all(k in kwargs for k in required):
                raise ValueError(f"Missing required kwargs for 'random': {required}")

            train_size = kwargs['train_size']
            val_size = kwargs['val_size']
            test_size = kwargs['test_size']

            if not np.isclose(train_size + val_size + test_size, 1.0):
                raise ValueError("train_size + val_size + test_size must equal 1.0")

            # Random split using indexes
            train_idx, temp_idx = train_test_split(indexable_data.index, train_size=train_size, random_state=1)
            val_rel = val_size / (val_size + test_size)
            val_idx, test_idx = train_test_split(temp_idx, test_size=1 - val_rel, random_state=1)

        elif splitting_algorithm == "inside_to_outside":
            required = ['stable_points', 'distance1', 'distance2']
            if not all(k in kwargs for k in required):
                raise ValueError(f"Missing required kwargs for 'inside_to_outside': {required}")

            stable_points = kwargs['stable_points']
            distance1 = kwargs['distance1']
            distance2 = kwargs['distance2']

            train_idx, val_idx, test_idx = self.separate_points_distance_allSets(
                point_list, stable_points, distance1, distance2
            )
        else:
            raise ValueError("splitting_algorithm must be either 'random' or 'inside_to_outside'")

        train_data = indexable_data.iloc[train_idx]
        val_data = indexable_data.iloc[val_idx]
        test_data = indexable_data.iloc[test_idx]

        return train_data, val_data, test_data


    def get_subset(self, property_name, filters=None, models_to_include=None):
        """
        Return a filtered, wide-format DataFrame for a given property.

        :param property_name: Name of the property (e.g., "BE", "ChRad").
        :param filters: Dictionary of filtering rules applied to the domain columns (e.g., {"Z": (26, 28)}).
        :param models_to_include: Optional list of model names to retain in the output.
                                If None, all model columns are retained.
        :return: Filtered wide-format DataFrame with columns: domain keys + model columns.
        """
        if property_name not in self.data:
            raise ValueError(f"Property '{property_name}' not found in dataset.")

        df = self.data[property_name].copy()

        # Apply row-level filters (domain-based)
        if filters:
            for column, condition in filters.items():
                if column == 'multi' and callable(condition):
                    df = df[df.apply(condition, axis=1)]
                elif callable(condition):
                    df = df[condition(df[column])]
                elif isinstance(condition, tuple) and len(condition) == 2:
                    df = df[(df[column] >= condition[0]) & (df[column] <= condition[1])]
                elif isinstance(condition, list):
                    df = df[df[column].isin(condition)]
                else:
                    df = df[df[column] == condition]

        # Optionally restrict to a subset of models
        if models_to_include is not None:
            domain_keys = [col for col in ['N', 'Z'] if col in df.columns]
            allowed_cols = domain_keys + [m for m in models_to_include if m in df.columns]
            df = df[allowed_cols]

        return df

__init__(data_source=None, verbose=True)

Initialize the Dataset object.

:param data_source: Path to the data file (.h5 or .csv). :param verbose: If True, display warnings and informational messages. Default is True.

Source code in pybmc/data.py
def __init__(self, data_source=None, verbose=True):
    """
    Initialize the Dataset object.

    :param data_source: Path to the data file (.h5 or .csv).
    :param verbose: If True, display warnings and informational messages. Default is True.
    """
    self.data_source = data_source
    self.data = {}  # Dictionary of model to DataFrame
    self.verbose = verbose
    self.logger = logging.getLogger(__name__)
    if not self.logger.handlers:
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter('%(message)s'))
        self.logger.addHandler(handler)
    self.logger.setLevel(logging.INFO if verbose else logging.WARNING)

get_subset(property_name, filters=None, models_to_include=None)

Return a filtered, wide-format DataFrame for a given property.

:param property_name: Name of the property (e.g., "BE", "ChRad"). :param filters: Dictionary of filtering rules applied to the domain columns (e.g., {"Z": (26, 28)}). :param models_to_include: Optional list of model names to retain in the output. If None, all model columns are retained. :return: Filtered wide-format DataFrame with columns: domain keys + model columns.

Source code in pybmc/data.py
def get_subset(self, property_name, filters=None, models_to_include=None):
    """
    Return a filtered, wide-format DataFrame for a given property.

    :param property_name: Name of the property (e.g., "BE", "ChRad").
    :param filters: Dictionary of filtering rules applied to the domain columns (e.g., {"Z": (26, 28)}).
    :param models_to_include: Optional list of model names to retain in the output.
                            If None, all model columns are retained.
    :return: Filtered wide-format DataFrame with columns: domain keys + model columns.
    """
    if property_name not in self.data:
        raise ValueError(f"Property '{property_name}' not found in dataset.")

    df = self.data[property_name].copy()

    # Apply row-level filters (domain-based)
    if filters:
        for column, condition in filters.items():
            if column == 'multi' and callable(condition):
                df = df[df.apply(condition, axis=1)]
            elif callable(condition):
                df = df[condition(df[column])]
            elif isinstance(condition, tuple) and len(condition) == 2:
                df = df[(df[column] >= condition[0]) & (df[column] <= condition[1])]
            elif isinstance(condition, list):
                df = df[df[column].isin(condition)]
            else:
                df = df[df[column] == condition]

    # Optionally restrict to a subset of models
    if models_to_include is not None:
        domain_keys = [col for col in ['N', 'Z'] if col in df.columns]
        allowed_cols = domain_keys + [m for m in models_to_include if m in df.columns]
        df = df[allowed_cols]

    return df

load_data(models, keys=None, domain_keys=None, model_column='model', truth_column_name=None)

Load data for each property and return a dictionary of synchronized DataFrames. Each DataFrame has columns: domain_keys + one column per model for that property.

Parameters:

Name Type Description Default
models list

List of model names (for HDF5 keys or filtering CSV).

required
keys list

List of property names to extract (each will be a key in the output dict).

None
domain_keys list

List of columns used to define the common domain (default ['N', 'Z']).

None
model_column str

Name of the column in the CSV that identifies which model each row belongs to. Only used for CSV files; ignored for HDF5 files.

'model'
truth_column_name str

Name of the truth model. If provided, the truth data will be left-joined to the common domain of the other models, allowing the truth data to have a smaller domain than the models.

None

Returns:

Name Type Description
dict

Dictionary where each key is a property name and each value is a DataFrame with columns: domain_keys + one column per model for that property. The DataFrames are synchronized to the intersection of the domains for all models. If truth_column_name is provided, truth data is left-joined (may have NaN values).

Supports both .h5 and .csv files.

Source code in pybmc/data.py
def load_data(self, models, keys=None, domain_keys=None, model_column='model', truth_column_name=None):
    """
    Load data for each property and return a dictionary of synchronized DataFrames.
    Each DataFrame has columns: domain_keys + one column per model for that property.

    Parameters:
        models (list): List of model names (for HDF5 keys or filtering CSV).
        keys (list): List of property names to extract (each will be a key in the output dict).
        domain_keys (list, optional): List of columns used to define the common domain (default ['N', 'Z']).
        model_column (str, optional): Name of the column in the CSV that identifies which model each row belongs to.
                                      Only used for CSV files; ignored for HDF5 files.
        truth_column_name (str, optional): Name of the truth model. If provided, the truth data will be 
                                           left-joined to the common domain of the other models, allowing 
                                           the truth data to have a smaller domain than the models.

    Returns:
        dict: Dictionary where each key is a property name and each value is a DataFrame with columns:
              domain_keys + one column per model for that property.
              The DataFrames are synchronized to the intersection of the domains for all models.
              If truth_column_name is provided, truth data is left-joined (may have NaN values).

    Supports both .h5 and .csv files.
    """
    self.domain_keys = domain_keys 

    if self.data_source is None:
        raise ValueError("Data source must be specified.")
    if not os.path.exists(self.data_source):
        raise FileNotFoundError(f"Data source '{self.data_source}' not found.")
    if keys is None:
        raise ValueError("You must specify which properties to extract via 'keys'.")

    result = {}

    for prop in keys:
        dfs = []
        truth_df = None
        skipped_models = []

        # Separate regular models from truth model
        regular_models = [m for m in models if m != truth_column_name]

        if self.data_source.endswith('.h5'):
            for model in models:
                df = pd.read_hdf(self.data_source, key=model)
                # Check required columns
                missing_cols = [col for col in domain_keys + [prop] if col not in df.columns]
                if missing_cols:
                    self.logger.info(f"[Skipped] Model '{model}' missing columns {missing_cols} for property '{prop}'.")
                    skipped_models.append(model)
                    continue
                temp = df[domain_keys + [prop]].copy()
                temp.rename(columns={prop: model}, inplace=True) # type: ignore

                # Store truth data separately if truth_column_name is provided
                if truth_column_name and model == truth_column_name:
                    truth_df = temp
                else:
                    dfs.append(temp)

        elif self.data_source.endswith('.csv'):
            df = pd.read_csv(self.data_source)
            for model in models:
                if model_column not in df.columns:
                    raise ValueError(f"Expected column '{model_column}' not found in CSV.")
                model_df = df[df[model_column] == model]
                missing_cols = [col for col in domain_keys + [prop] if col not in model_df.columns]
                if missing_cols:
                    self.logger.info(f"[Skipped] Model '{model}' missing columns {missing_cols} for key '{prop}'.")
                    skipped_models.append(model)
                    continue
                temp = model_df[domain_keys + [prop]].copy()
                temp.rename(columns={prop: model}, inplace=True)

                # Store truth data separately if truth_column_name is provided
                if truth_column_name and model == truth_column_name:
                    truth_df = temp
                else:
                    dfs.append(temp)
        else:
            raise ValueError("Unsupported file format. Only .h5 and .csv are supported.")

        if not dfs:
            self.logger.info(f"[Warning] No models with property '{prop}'. Resulting DataFrame will be empty.")
            result[prop] = pd.DataFrame(columns=domain_keys + [m for m in models if m not in skipped_models])
            continue

        # Intersect domain for regular models only
        common_df = dfs[0]
        for other_df in dfs[1:]:
            common_df = pd.merge(common_df, other_df, on=domain_keys, how="inner")
        # Drop rows with NaN in any required column (for regular models)
        common_df = common_df.dropna()

        # Left join truth data if it exists and was specified
        if truth_df is not None:
            common_df = pd.merge(common_df, truth_df, on=domain_keys, how="left")

        result[prop] = common_df
        self.data = result
    return result

separate_points_distance_allSets(list1, list2, distance1, distance2)

Separates points in list1 into three groups based on their proximity to any point in list2.

:param list1: List of (x, y) tuples. :param list2: List of (x, y) tuples. :param distance: The threshold distance to determine proximity. :return: Two lists - close_points and distant_points.

Source code in pybmc/data.py
def separate_points_distance_allSets(self, list1, list2, distance1, distance2):
        """
        Separates points in list1 into three groups based on their proximity to any point in list2.

        :param list1: List of (x, y) tuples.
        :param list2: List of (x, y) tuples.
        :param distance: The threshold distance to determine proximity.
        :return: Two lists - close_points and distant_points.
        """
        train = []
        validation=[]
        test = []

        train_list_coordinates=[]
        validation_list_coordinates=[]
        test_list_coordinates=[]

        for i in range(len(list1)):
            point1=list1[i]
            close = False
            for point2 in list2:
                if np.linalg.norm(np.array(point1) - np.array(point2)) <= distance1:
                    close = True
                    break
            if close:
                train.append(point1)
                train_list_coordinates.append(i)
            else:
                close2=False
                for point2 in list2:
                    if np.linalg.norm(np.array(point1) - np.array(point2)) <= distance2:
                        close2 = True
                        break
                if close2:
                    validation.append(point1)
                    validation_list_coordinates.append(i)
                else:
                    test.append(point1)
                    test_list_coordinates.append(i)                

        return train_list_coordinates, validation_list_coordinates, test_list_coordinates

split_data(data_dict, property_name, splitting_algorithm='random', **kwargs)

Split data into training, validation, and testing sets using random or inside-to-outside logic.

:param data_dict: Dictionary output from load_data, where keys are property names and values are DataFrames. :param property_name: The key in data_dict specifying which DataFrame to use for splitting. :param splitting_algorithm: 'random' (default) or 'inside_to_outside'. :param kwargs: Additional arguments depending on the chosen algorithm. For 'random': train_size, val_size, test_size For 'inside_to_outside': stable_points (list of (x, y)), distance1, distance2 :return: Tuple of train, validation, test datasets as DataFrames.

Source code in pybmc/data.py
def split_data(self, data_dict, property_name, splitting_algorithm="random", **kwargs):
    """
    Split data into training, validation, and testing sets using random or inside-to-outside logic.

    :param data_dict: Dictionary output from `load_data`, where keys are property names and values are DataFrames.
    :param property_name: The key in `data_dict` specifying which DataFrame to use for splitting.
    :param splitting_algorithm: 'random' (default) or 'inside_to_outside'.
    :param kwargs: Additional arguments depending on the chosen algorithm.
        For 'random': train_size, val_size, test_size
        For 'inside_to_outside': stable_points (list of (x, y)), distance1, distance2
    :return: Tuple of train, validation, test datasets as DataFrames.
    """
    if property_name not in data_dict:
        raise ValueError(f"Property '{property_name}' not found in the provided data dictionary.")

    data = data_dict[property_name]

    if isinstance(data, pd.DataFrame):
        indexable_data = data.reset_index(drop=True)
        point_list = list(indexable_data.itertuples(index=False, name=None))
    else:
        raise TypeError("Data for the specified property must be a pandas DataFrame.")

    if splitting_algorithm == "random":
        required = ['train_size', 'val_size', 'test_size']
        if not all(k in kwargs for k in required):
            raise ValueError(f"Missing required kwargs for 'random': {required}")

        train_size = kwargs['train_size']
        val_size = kwargs['val_size']
        test_size = kwargs['test_size']

        if not np.isclose(train_size + val_size + test_size, 1.0):
            raise ValueError("train_size + val_size + test_size must equal 1.0")

        # Random split using indexes
        train_idx, temp_idx = train_test_split(indexable_data.index, train_size=train_size, random_state=1)
        val_rel = val_size / (val_size + test_size)
        val_idx, test_idx = train_test_split(temp_idx, test_size=1 - val_rel, random_state=1)

    elif splitting_algorithm == "inside_to_outside":
        required = ['stable_points', 'distance1', 'distance2']
        if not all(k in kwargs for k in required):
            raise ValueError(f"Missing required kwargs for 'inside_to_outside': {required}")

        stable_points = kwargs['stable_points']
        distance1 = kwargs['distance1']
        distance2 = kwargs['distance2']

        train_idx, val_idx, test_idx = self.separate_points_distance_allSets(
            point_list, stable_points, distance1, distance2
        )
    else:
        raise ValueError("splitting_algorithm must be either 'random' or 'inside_to_outside'")

    train_data = indexable_data.iloc[train_idx]
    val_data = indexable_data.iloc[val_idx]
    test_data = indexable_data.iloc[test_idx]

    return train_data, val_data, test_data

view_data(property_name=None, model_name=None)

View data flexibly based on input parameters.

  • No arguments: returns available property names and model names.
  • property_name only: returns the full DataFrame for that property.
  • model_name only: Return model values across all properties.
  • property_name + model_name: returns a Series of values for the model.

:param property_name: Optional property name :param model_name: Optional model name :return: dict, DataFrame, or Series depending on input.

Source code in pybmc/data.py
def view_data(self, property_name=None, model_name=None):
    """
    View data flexibly based on input parameters.

    - No arguments: returns available property names and model names.
    - property_name only: returns the full DataFrame for that property.
    - model_name only: Return model values across all properties.
    - property_name + model_name: returns a Series of values for the model.

    :param property_name: Optional property name 
    :param model_name: Optional model name 
    :return: dict, DataFrame, or Series depending on input.
    """

    if not self.data:
        raise RuntimeError("No data loaded. Run `load_data(...)` first.")

    if property_name is None and model_name is None:
        props = list(self.data.keys())
        models = sorted(set(col for prop_df in self.data.values() for col in prop_df.columns if col not in self.domain_keys))

        return {
            "available_properties": props,
            "available_models": models
        }

    if model_name is not None and property_name is None:
        # Return a dictionary: {property: Series of model values}
        result = {}
        for prop, df in self.data.items():
            if model_name in df.columns:
                cols = self.domain_keys + [model_name]
                result[prop] = df[cols]
            else:
                result[prop] = f"[Model '{model_name}' not available]"
        return result

    if property_name is not None:
        if property_name not in self.data:
            raise KeyError(f"Property '{property_name}' not found.")

        df = self.data[property_name]

        if model_name is None:
            return df  # Full property DataFrame

        if model_name not in df.columns:
            raise KeyError(f"Model '{model_name}' not found in property '{property_name}'.")

        return df[model_name]

BayesianModelCombination

The main idea of this class is to perform BMM on the set of models that we choose from the dataset class. What should this class contain: + Orthogonalization step. + Perform Bayesian inference on the training data that we extract from the Dataset class. + Predictions for certain isotopes.

Source code in pybmc/bmc.py
class BayesianModelCombination:
    """
    The main idea of this class is to perform BMM on the set of models that we choose 
    from the dataset class. What should this class contain:
    + Orthogonalization step.
    + Perform Bayesian inference on the training data that we extract from the Dataset class.
    + Predictions for certain isotopes.
    """

    def __init__(self, models_list, data_dict, truth_column_name, weights=None):
        """ 
        Initialize the BayesianModelCombination class.

        :param models_list: List of model names
        :param data_dict: Dictionary from `load_data()` where each key is a model name and each value is a DataFrame of properties
        :param truth_column_name: Name of the column containing the truth values.
        :param weights: Optional initial weights for the models.
        """

        if not isinstance(models_list, list) or not all(isinstance(model, str) for model in models_list):
            raise ValueError("The 'models' should be a list of model names (strings) for Bayesian Combination.")    
        if not isinstance(data_dict, dict) or not all(isinstance(df, pd.DataFrame) for df in data_dict.values()):
            raise ValueError("The 'data_dict' should be a dictionary of pandas DataFrames, one per property.")

        self.data_dict = data_dict 
        self.models_list = models_list 
        self.models = [m for m in models_list if m != 'truth']
        self.weights = weights if weights is not None else None 
        self.truth_column_name = truth_column_name


    def orthogonalize(self, property, train_df, components_kept):
        """
        Perform orthogonalization for the specified property using training data.

        :param property: The nuclear property to orthogonalize on (e.g., 'BE').
        :param train_index: Training data from split_data
        :param components_kept: Number of SVD components to retain.
        """
        # Store selected property
        self.current_property = property

        # Extract the relevant DataFrame for that property
        df = self.data_dict[property].copy()
        self.selected_models_dataset = df  # Store for train() and predict()

        # Extract model outputs (only the model columns)
        models_output_train = train_df[self.models]
        model_predictions_train = models_output_train.values

        # Mean prediction across models (per nucleus)
        predictions_mean_train = np.mean(model_predictions_train, axis=1)

        # Experimental truth values for the property
        centered_experiment_train = train_df[self.truth_column_name].values - predictions_mean_train

        # Center model predictions
        model_predictions_train_centered = model_predictions_train - predictions_mean_train[:, None]

        # Perform SVD
        U, S, Vt = np.linalg.svd(model_predictions_train_centered)

        # Dimensionality reduction
        U_hat, S_hat, Vt_hat, Vt_hat_normalized = USVt_hat_extraction(U, S, Vt, components_kept) #type: ignore

        # Save for training
        self.centered_experiment_train = centered_experiment_train
        self.U_hat = U_hat
        self.Vt_hat = Vt_hat
        self.S_hat = S_hat
        self.Vt_hat_normalized = Vt_hat_normalized
        self._predictions_mean_train = predictions_mean_train


    def train(self, training_options=None):
        """
        Train the model combination using training data and optional training parameters.

        :param training_data: Placeholder (not used).
        :param training_options: Dictionary of training options. Keys:
            - 'iterations': (int) Number of Gibbs iterations (default 50000)
            - 'b_mean_prior': (np.ndarray) Prior mean vector (default zeros)
            - 'b_mean_cov': (np.ndarray) Prior covariance matrix (default diag(S_hat²))
            - 'nu0_chosen': (float) Degrees of freedom for variance prior (default 1.0)
            - 'sigma20_chosen': (float) Prior variance (default 0.02)
        """
        if training_options is None:
            training_options = {}

        iterations = training_options.get('iterations', 50000)
        num_components = self.U_hat.shape[1]
        S_hat = self.S_hat

        b_mean_prior = training_options.get('b_mean_prior', np.zeros(num_components))
        b_mean_cov = training_options.get('b_mean_cov', np.diag(S_hat**2))
        nu0_chosen = training_options.get('nu0_chosen', 1.0)
        sigma20_chosen = training_options.get('sigma20_chosen', 0.02)

        self.samples = gibbs_sampler(self.centered_experiment_train, self.U_hat, iterations, [b_mean_prior, b_mean_cov, nu0_chosen, sigma20_chosen])



    def predict(self, property):
        """
        Predict a specified property using the model weights learned during training.

        :param property: The property name to predict (e.g., 'ChRad').
        :return:
            - rndm_m: array of shape (n_samples, n_points), full posterior draws
            - lower_df: DataFrame with columns domain_keys + ['Predicted_Lower']
            - median_df: DataFrame with columns domain_keys + ['Predicted_Median']
            - upper_df: DataFrame with columns domain_keys + ['Predicted_Upper']
        """
        if self.samples is None or self.Vt_hat is None:
            raise ValueError("Must call `orthogonalize()` and `train()` before predicting.")

        if property not in self.data_dict:
            raise KeyError(f"Property '{property}' not found in data_dict.")

        df = self.data_dict[property].copy()

        # Infer domain and model columns
        full_model_cols = self.models
        domain_keys = [col for col in df.columns if col not in full_model_cols and col != self.truth_column_name]

        # Determine which models are present
        available_models = [m for m in full_model_cols if m in df.columns]

        if len(available_models) == 0:
            raise ValueError("No available trained models are present in prediction DataFrame.")

        # Filter predictions and model weights
        model_preds = df[available_models].values
        domain_df = df[domain_keys].reset_index(drop=True)

        rndm_m, (lower, median, upper) = rndm_m_random_calculator(model_preds, self.samples, self.Vt_hat)

        # Build output DataFrames
        lower_df = domain_df.copy()

        lower_df["Predicted_Lower"] = lower

        median_df = domain_df.copy()
        median_df["Predicted_Median"] = median

        upper_df = domain_df.copy()
        upper_df["Predicted_Upper"] = upper

        return rndm_m, lower_df, median_df, upper_df

    def evaluate(self, domain_filter=None):
        """
        Evaluate the model combination using coverage calculation.

        :param domain_filter: dict with optional domain key ranges, e.g., {"Z": (20, 30), "N": (20, 40)}
        :return: coverage list for each percentile
        """
        df = self.data_dict[self.current_property]

        if domain_filter:
            # Inline optimized filtering
            for col, cond in domain_filter.items():
                if col == 'multi' and callable(cond):
                    df = df[df.apply(cond, axis=1)]
                elif callable(cond):
                    df = df[cond(df[col])]
                elif isinstance(cond, tuple) and len(cond) == 2:
                    df = df[df[col].between(*cond)]
                elif isinstance(cond, list):
                    df = df[df[col].isin(cond)]
                else:
                    df = df[df[col] == cond]

        preds = df[self.models].to_numpy()
        rndm_m, (lower, median, upper) = rndm_m_random_calculator(preds, self.samples, self.Vt_hat)

        return coverage(np.arange(0, 101, 5), rndm_m, df, truth_column=self.truth_column_name)

__init__(models_list, data_dict, truth_column_name, weights=None)

Initialize the BayesianModelCombination class.

:param models_list: List of model names :param data_dict: Dictionary from load_data() where each key is a model name and each value is a DataFrame of properties :param truth_column_name: Name of the column containing the truth values. :param weights: Optional initial weights for the models.

Source code in pybmc/bmc.py
def __init__(self, models_list, data_dict, truth_column_name, weights=None):
    """ 
    Initialize the BayesianModelCombination class.

    :param models_list: List of model names
    :param data_dict: Dictionary from `load_data()` where each key is a model name and each value is a DataFrame of properties
    :param truth_column_name: Name of the column containing the truth values.
    :param weights: Optional initial weights for the models.
    """

    if not isinstance(models_list, list) or not all(isinstance(model, str) for model in models_list):
        raise ValueError("The 'models' should be a list of model names (strings) for Bayesian Combination.")    
    if not isinstance(data_dict, dict) or not all(isinstance(df, pd.DataFrame) for df in data_dict.values()):
        raise ValueError("The 'data_dict' should be a dictionary of pandas DataFrames, one per property.")

    self.data_dict = data_dict 
    self.models_list = models_list 
    self.models = [m for m in models_list if m != 'truth']
    self.weights = weights if weights is not None else None 
    self.truth_column_name = truth_column_name

evaluate(domain_filter=None)

Evaluate the model combination using coverage calculation.

:param domain_filter: dict with optional domain key ranges, e.g., {"Z": (20, 30), "N": (20, 40)} :return: coverage list for each percentile

Source code in pybmc/bmc.py
def evaluate(self, domain_filter=None):
    """
    Evaluate the model combination using coverage calculation.

    :param domain_filter: dict with optional domain key ranges, e.g., {"Z": (20, 30), "N": (20, 40)}
    :return: coverage list for each percentile
    """
    df = self.data_dict[self.current_property]

    if domain_filter:
        # Inline optimized filtering
        for col, cond in domain_filter.items():
            if col == 'multi' and callable(cond):
                df = df[df.apply(cond, axis=1)]
            elif callable(cond):
                df = df[cond(df[col])]
            elif isinstance(cond, tuple) and len(cond) == 2:
                df = df[df[col].between(*cond)]
            elif isinstance(cond, list):
                df = df[df[col].isin(cond)]
            else:
                df = df[df[col] == cond]

    preds = df[self.models].to_numpy()
    rndm_m, (lower, median, upper) = rndm_m_random_calculator(preds, self.samples, self.Vt_hat)

    return coverage(np.arange(0, 101, 5), rndm_m, df, truth_column=self.truth_column_name)

orthogonalize(property, train_df, components_kept)

Perform orthogonalization for the specified property using training data.

:param property: The nuclear property to orthogonalize on (e.g., 'BE'). :param train_index: Training data from split_data :param components_kept: Number of SVD components to retain.

Source code in pybmc/bmc.py
def orthogonalize(self, property, train_df, components_kept):
    """
    Perform orthogonalization for the specified property using training data.

    :param property: The nuclear property to orthogonalize on (e.g., 'BE').
    :param train_index: Training data from split_data
    :param components_kept: Number of SVD components to retain.
    """
    # Store selected property
    self.current_property = property

    # Extract the relevant DataFrame for that property
    df = self.data_dict[property].copy()
    self.selected_models_dataset = df  # Store for train() and predict()

    # Extract model outputs (only the model columns)
    models_output_train = train_df[self.models]
    model_predictions_train = models_output_train.values

    # Mean prediction across models (per nucleus)
    predictions_mean_train = np.mean(model_predictions_train, axis=1)

    # Experimental truth values for the property
    centered_experiment_train = train_df[self.truth_column_name].values - predictions_mean_train

    # Center model predictions
    model_predictions_train_centered = model_predictions_train - predictions_mean_train[:, None]

    # Perform SVD
    U, S, Vt = np.linalg.svd(model_predictions_train_centered)

    # Dimensionality reduction
    U_hat, S_hat, Vt_hat, Vt_hat_normalized = USVt_hat_extraction(U, S, Vt, components_kept) #type: ignore

    # Save for training
    self.centered_experiment_train = centered_experiment_train
    self.U_hat = U_hat
    self.Vt_hat = Vt_hat
    self.S_hat = S_hat
    self.Vt_hat_normalized = Vt_hat_normalized
    self._predictions_mean_train = predictions_mean_train

predict(property)

Predict a specified property using the model weights learned during training.

:param property: The property name to predict (e.g., 'ChRad'). :return: - rndm_m: array of shape (n_samples, n_points), full posterior draws - lower_df: DataFrame with columns domain_keys + ['Predicted_Lower'] - median_df: DataFrame with columns domain_keys + ['Predicted_Median'] - upper_df: DataFrame with columns domain_keys + ['Predicted_Upper']

Source code in pybmc/bmc.py
def predict(self, property):
    """
    Predict a specified property using the model weights learned during training.

    :param property: The property name to predict (e.g., 'ChRad').
    :return:
        - rndm_m: array of shape (n_samples, n_points), full posterior draws
        - lower_df: DataFrame with columns domain_keys + ['Predicted_Lower']
        - median_df: DataFrame with columns domain_keys + ['Predicted_Median']
        - upper_df: DataFrame with columns domain_keys + ['Predicted_Upper']
    """
    if self.samples is None or self.Vt_hat is None:
        raise ValueError("Must call `orthogonalize()` and `train()` before predicting.")

    if property not in self.data_dict:
        raise KeyError(f"Property '{property}' not found in data_dict.")

    df = self.data_dict[property].copy()

    # Infer domain and model columns
    full_model_cols = self.models
    domain_keys = [col for col in df.columns if col not in full_model_cols and col != self.truth_column_name]

    # Determine which models are present
    available_models = [m for m in full_model_cols if m in df.columns]

    if len(available_models) == 0:
        raise ValueError("No available trained models are present in prediction DataFrame.")

    # Filter predictions and model weights
    model_preds = df[available_models].values
    domain_df = df[domain_keys].reset_index(drop=True)

    rndm_m, (lower, median, upper) = rndm_m_random_calculator(model_preds, self.samples, self.Vt_hat)

    # Build output DataFrames
    lower_df = domain_df.copy()

    lower_df["Predicted_Lower"] = lower

    median_df = domain_df.copy()
    median_df["Predicted_Median"] = median

    upper_df = domain_df.copy()
    upper_df["Predicted_Upper"] = upper

    return rndm_m, lower_df, median_df, upper_df

train(training_options=None)

Train the model combination using training data and optional training parameters.

:param training_data: Placeholder (not used). :param training_options: Dictionary of training options. Keys: - 'iterations': (int) Number of Gibbs iterations (default 50000) - 'b_mean_prior': (np.ndarray) Prior mean vector (default zeros) - 'b_mean_cov': (np.ndarray) Prior covariance matrix (default diag(S_hat²)) - 'nu0_chosen': (float) Degrees of freedom for variance prior (default 1.0) - 'sigma20_chosen': (float) Prior variance (default 0.02)

Source code in pybmc/bmc.py
def train(self, training_options=None):
    """
    Train the model combination using training data and optional training parameters.

    :param training_data: Placeholder (not used).
    :param training_options: Dictionary of training options. Keys:
        - 'iterations': (int) Number of Gibbs iterations (default 50000)
        - 'b_mean_prior': (np.ndarray) Prior mean vector (default zeros)
        - 'b_mean_cov': (np.ndarray) Prior covariance matrix (default diag(S_hat²))
        - 'nu0_chosen': (float) Degrees of freedom for variance prior (default 1.0)
        - 'sigma20_chosen': (float) Prior variance (default 0.02)
    """
    if training_options is None:
        training_options = {}

    iterations = training_options.get('iterations', 50000)
    num_components = self.U_hat.shape[1]
    S_hat = self.S_hat

    b_mean_prior = training_options.get('b_mean_prior', np.zeros(num_components))
    b_mean_cov = training_options.get('b_mean_cov', np.diag(S_hat**2))
    nu0_chosen = training_options.get('nu0_chosen', 1.0)
    sigma20_chosen = training_options.get('sigma20_chosen', 0.02)

    self.samples = gibbs_sampler(self.centered_experiment_train, self.U_hat, iterations, [b_mean_prior, b_mean_cov, nu0_chosen, sigma20_chosen])

USVt_hat_extraction(U, S, Vt, components_kept)

Extracts reduced-dimensionality matrices from SVD results.

Parameters:

Name Type Description Default
U ndarray

Left singular vectors.

required
S ndarray

Singular values.

required
Vt ndarray

Right singular vectors (transposed).

required
components_kept int

Number of components to retain.

required

Returns:

Type Description

tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]: - U_hat (numpy.ndarray): Reduced left singular vectors. - S_hat (numpy.ndarray): Retained singular values. - Vt_hat (numpy.ndarray): Normalized right singular vectors. - Vt_hat_normalized (numpy.ndarray): Original right singular vectors.

Source code in pybmc/inference_utils.py
def USVt_hat_extraction(U, S, Vt, components_kept):
    """
    Extracts reduced-dimensionality matrices from SVD results.

    Args:
        U (numpy.ndarray): Left singular vectors.
        S (numpy.ndarray): Singular values.
        Vt (numpy.ndarray): Right singular vectors (transposed).
        components_kept (int): Number of components to retain.

    Returns:
        tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]:
            - `U_hat` (numpy.ndarray): Reduced left singular vectors.
            - `S_hat` (numpy.ndarray): Retained singular values.
            - `Vt_hat` (numpy.ndarray): Normalized right singular vectors.
            - `Vt_hat_normalized` (numpy.ndarray): Original right singular vectors.
    """
    U_hat = np.array([U.T[i] for i in range(components_kept)]).T
    S_hat = S[:components_kept]
    Vt_hat = np.array([Vt[i] / S[i] for i in range(components_kept)])
    Vt_hat_normalized = np.array([Vt[i] for i in range(components_kept)])
    return U_hat, S_hat, Vt_hat, Vt_hat_normalized

gibbs_sampler(y, X, iterations, prior_info)

Performs Gibbs sampling for Bayesian linear regression.

Parameters:

Name Type Description Default
y ndarray

Response vector (centered).

required
X ndarray

Design matrix.

required
iterations int

Number of sampling iterations.

required
prior_info tuple[ndarray, ndarray, float, float]

Prior parameters: - b_mean_prior (numpy.ndarray): Prior mean for coefficients. - b_mean_cov (numpy.ndarray): Prior covariance matrix. - nu0 (float): Prior degrees of freedom for variance. - sigma20 (float): Prior variance.

required

Returns:

Type Description

numpy.ndarray: Posterior samples [beta, sigma].

Source code in pybmc/inference_utils.py
def gibbs_sampler(y, X, iterations, prior_info):
    """
    Performs Gibbs sampling for Bayesian linear regression.

    Args:
        y (numpy.ndarray): Response vector (centered).
        X (numpy.ndarray): Design matrix.
        iterations (int): Number of sampling iterations.
        prior_info (tuple[numpy.ndarray, numpy.ndarray, float, float]): Prior parameters:
            - `b_mean_prior` (numpy.ndarray): Prior mean for coefficients.
            - `b_mean_cov` (numpy.ndarray): Prior covariance matrix.
            - `nu0` (float): Prior degrees of freedom for variance.
            - `sigma20` (float): Prior variance.

    Returns:
        numpy.ndarray: Posterior samples `[beta, sigma]`.
    """
    b_mean_prior, b_mean_cov, nu0, sigma20 = prior_info
    b_mean_cov_inv = np.linalg.inv(b_mean_cov)
    n = len(y)

    X_T_X = X.T.dot(X)
    X_T_X_inv = np.linalg.inv(X_T_X)

    b_data = X_T_X_inv.dot(X.T).dot(y)
    supermodel = X.dot(b_data)
    residuals = y - supermodel
    sigma2 = np.sum(residuals**2) / len(residuals)
    cov_matrix = sigma2 * X_T_X_inv

    samples = []

    # Initialize sigma2 with a small positive value to avoid division by zero
    sigma2 = max(sigma2, 1e-6)

    for i in range(iterations):
        # Regularize the covariance matrix to ensure it is positive definite
        cov_matrix = np.linalg.inv(X_T_X / sigma2 + b_mean_cov_inv + np.eye(X_T_X.shape[0]) * 1e-6)
        mean_vector = cov_matrix.dot(
            b_mean_cov_inv.dot(b_mean_prior) + X.T.dot(y) / sigma2
        )
        b_current = np.random.multivariate_normal(mean_vector, cov_matrix)

        # Sample from the conditional posterior of sigma2 given bs and data
        supermodel = X.dot(b_current)
        residuals = y - supermodel
        shape_post = (nu0 + n) / 2.0
        scale_post = (nu0 * sigma20 + np.sum(residuals**2)) / 2.0
        sigma2 = max(1 / np.random.default_rng().gamma(shape_post, 1 / scale_post), 1e-6)

        samples.append(np.append(b_current, np.sqrt(sigma2)))

    return np.array(samples)

gibbs_sampler_simplex(y, X, Vt_hat, S_hat, iterations, prior_info, burn=10000, stepsize=0.001)

Performs Gibbs sampling with simplex constraints on model weights.

Parameters:

Name Type Description Default
y ndarray

Centered response vector.

required
X ndarray

Design matrix of principal components.

required
Vt_hat ndarray

Normalized right singular vectors.

required
S_hat ndarray

Singular values.

required
iterations int

Number of sampling iterations.

required
prior_info list[float]

[nu0, sigma20] - prior parameters for variance.

required
burn int

Burn-in iterations (default: 10000).

10000
stepsize float

Proposal step size (default: 0.001).

0.001

Returns:

Type Description

numpy.ndarray: Posterior samples [beta, sigma].

Source code in pybmc/inference_utils.py
def gibbs_sampler_simplex(
    y, X, Vt_hat, S_hat, iterations, prior_info, burn=10000, stepsize=0.001
):
    """
    Performs Gibbs sampling with simplex constraints on model weights.

    Args:
        y (numpy.ndarray): Centered response vector.
        X (numpy.ndarray): Design matrix of principal components.
        Vt_hat (numpy.ndarray): Normalized right singular vectors.
        S_hat (numpy.ndarray): Singular values.
        iterations (int): Number of sampling iterations.
        prior_info (list[float]): `[nu0, sigma20]` - prior parameters for variance.
        burn (int, optional): Burn-in iterations (default: 10000).
        stepsize (float, optional): Proposal step size (default: 0.001).

    Returns:
        numpy.ndarray: Posterior samples `[beta, sigma]`.
    """
    bias0 = np.full(len(Vt_hat.T), 1 / len(Vt_hat.T))
    nu0, sigma20 = prior_info
    cov_matrix_step = np.diag(S_hat**2 * stepsize**2)
    n = len(y)
    b_current = np.full(len(X.T), 0)
    supermodel_current = X.dot(b_current)
    residuals_current = y - supermodel_current
    log_likelihood_current = -np.sum(residuals_current**2)
    sigma2 = -log_likelihood_current / len(residuals_current)
    samples = []
    acceptance = 0

    # Validate inputs
    if burn < 0:
        raise ValueError("Burn-in iterations must be non-negative.")
    if stepsize <= 0:
        raise ValueError("Stepsize must be positive.")

    # Burn-in phase
    for i in range(burn):
        b_proposed = np.random.multivariate_normal(b_current, cov_matrix_step)
        omegas_proposed = np.dot(b_proposed, Vt_hat) + bias0

        # Skip proposals with negative weights
        if not np.any(omegas_proposed < 0):
            supermodel_proposed = X.dot(b_proposed)
            residuals_proposed = y - supermodel_proposed
            log_likelihood_proposed = -np.sum(residuals_proposed**2)
            acceptance_prob = min(
                1,
                np.exp((log_likelihood_proposed - log_likelihood_current) / sigma2),
            )
            if np.random.uniform() < acceptance_prob:
                b_current = np.copy(b_proposed)
                log_likelihood_current = log_likelihood_proposed

        # Sample variance
        shape_post = (nu0 + n) / 2.0
        scale_post = (nu0 * sigma20 - log_likelihood_current) / 2.0
        sigma2 = 1 / np.random.default_rng().gamma(shape_post, 1 / scale_post)

    # Sampling phase
    for i in range(iterations):
        b_proposed = np.random.multivariate_normal(b_current, cov_matrix_step)
        omegas_proposed = np.dot(b_proposed, Vt_hat) + bias0

        if not np.any(omegas_proposed < 0):
            supermodel_proposed = X.dot(b_proposed)
            residuals_proposed = y - supermodel_proposed
            log_likelihood_proposed = -np.sum(residuals_proposed**2)
            acceptance_prob = min(
                1,
                np.exp((log_likelihood_proposed - log_likelihood_current) / sigma2),
            )
            if np.random.uniform() < acceptance_prob:
                b_current = np.copy(b_proposed)
                log_likelihood_current = log_likelihood_proposed
                acceptance += 1

        # Sample variance
        shape_post = (nu0 + n) / 2.0
        scale_post = (nu0 * sigma20 - log_likelihood_current) / 2.0
        sigma2 = 1 / np.random.default_rng().gamma(shape_post, 1 / scale_post)
        samples.append(np.append(b_current, np.sqrt(sigma2)))

    return np.array(samples)

coverage(percentiles, rndm_m, models_output, truth_column)

Calculates coverage percentages for credible intervals.

Parameters:

Name Type Description Default
percentiles list[int]

Percentiles to evaluate (e.g., [5, 10, ..., 95]).

required
rndm_m ndarray

Posterior samples of predictions.

required
models_output DataFrame

DataFrame containing true values.

required
truth_column str

Name of column with true values.

required

Returns:

Type Description

list[float]: Coverage percentages for each percentile.

Source code in pybmc/sampling_utils.py
def coverage(percentiles, rndm_m, models_output, truth_column):
    """
    Calculates coverage percentages for credible intervals.

    Args:
        percentiles (list[int]): Percentiles to evaluate (e.g., `[5, 10, ..., 95]`).
        rndm_m (numpy.ndarray): Posterior samples of predictions.
        models_output (pandas.DataFrame): DataFrame containing true values.
        truth_column (str): Name of column with true values.

    Returns:
        list[float]: Coverage percentages for each percentile.
    """
    #  How often the model’s credible intervals actually contain the true value
    data_total = len(rndm_m.T)  # Number of data points
    M_evals = len(rndm_m)  # Number of samples
    data_true = models_output[truth_column].tolist()

    coverage_results = []

    for p in percentiles:
        count_covered = 0
        for i in range(data_total):
            # Sort model evaluations for the i-th data point
            sorted_evals = np.sort(rndm_m.T[i])
            # Find indices for lower and upper bounds of the credible interval
            lower_idx = int((0.5 - p / 200) * M_evals)
            upper_idx = int((0.5 + p / 200) * M_evals) - 1
            # Check if the true value y[i] is within this interval
            if sorted_evals[lower_idx] <= data_true[i] <= sorted_evals[upper_idx]:
                count_covered += 1
        coverage_results.append(count_covered / data_total * 100)

    return coverage_results

rndm_m_random_calculator(filtered_model_predictions, samples, Vt_hat)

Generates posterior predictive samples and credible intervals.

Parameters:

Name Type Description Default
filtered_model_predictions ndarray

Model predictions.

required
samples ndarray

Gibbs samples [beta, sigma].

required
Vt_hat ndarray

Normalized right singular vectors.

required

Returns:

Type Description

tuple[numpy.ndarray, list[numpy.ndarray]]: - rndm_m (numpy.ndarray): Posterior predictive samples. - [lower, median, upper] (list[numpy.ndarray]): Credible interval arrays.

Source code in pybmc/sampling_utils.py
def rndm_m_random_calculator(filtered_model_predictions, samples, Vt_hat):
    """
    Generates posterior predictive samples and credible intervals.

    Args:
        filtered_model_predictions (numpy.ndarray): Model predictions.
        samples (numpy.ndarray): Gibbs samples `[beta, sigma]`.
        Vt_hat (numpy.ndarray): Normalized right singular vectors.

    Returns:
        tuple[numpy.ndarray, list[numpy.ndarray]]:
            - `rndm_m` (numpy.ndarray): Posterior predictive samples.
            - `[lower, median, upper]` (list[numpy.ndarray]): Credible interval arrays.
    """
    np.random.seed(142858)
    rng = np.random.default_rng()

    theta_rand_selected = rng.choice(samples, 10000, replace=False)

    # Extract betas and noise std deviations
    betas = theta_rand_selected[:, :-1]  # shape: (10000, num_models - 1)
    noise_stds = theta_rand_selected[:, -1]  # shape: (10000,)

    # Compute model weights: shape (10000, num_models)
    default_weights = np.full(Vt_hat.shape[1], 1 / Vt_hat.shape[1])
    model_weights_random = (
        betas @ Vt_hat + default_weights
    )  # broadcasting default_weights

    # Generate noiseless predictions: shape (10000, num_data_points)
    yvals_rand_radius = (
        model_weights_random @ filtered_model_predictions.T
    )  # dot product

    # Add Gaussian noise with std = noise_stds (assume diagonal covariance)
    # We'll use broadcasting: noise_stds[:, None] * standard normal noise
    noise = rng.standard_normal(yvals_rand_radius.shape) * noise_stds[:, None]
    rndm_m = yvals_rand_radius + noise

    # Compute credible intervals
    lower_radius = np.percentile(rndm_m, 2.5, axis=0)
    median_radius = np.percentile(rndm_m, 50, axis=0)
    upper_radius = np.percentile(rndm_m, 97.5, axis=0)

    return rndm_m, [lower_radius, median_radius, upper_radius]