Skip to content

API Reference

This API reference provides detailed documentation for the classes and functions in the pybmc package. It is automatically generated from the docstrings in the source code.

Dataset

Manages datasets for Bayesian model combination workflows.

Supports loading data from HDF5 and CSV files, splitting data, and filtering.

Attributes:

Name Type Description
data_source str

Path to data file.

data dict[str, DataFrame]

Dictionary of loaded data by property.

domain_keys list[str]

Domain columns used for data alignment.

Source code in pybmc/data.py
class Dataset:
    """
    Manages datasets for Bayesian model combination workflows.

    Supports loading data from HDF5 and CSV files, splitting data, and filtering.

    Attributes:
        data_source (str): Path to data file.
        data (dict[str, pandas.DataFrame]): Dictionary of loaded data by property.
        domain_keys (list[str]): Domain columns used for data alignment.
    """

    def __init__(self, data_source=None):
        """
        Initializes the Dataset instance.

        Args:
            data_source (str, optional): Path to data file (.h5 or .csv).
        """
        self.data_source = data_source
        self.data = {}  # Dictionary of model to DataFrame
        self.domain_keys = ["X1", "X2"]  # Default domain keys

    def load_data(self, models, keys=None, domain_keys=None, model_column="model"):
        """
        Loads data for multiple properties and models.

        Args:
            models (list[str]): Model names to load.
            keys (list[str]): Property names to extract.
            domain_keys (list[str], optional): Domain columns (default: ['N', 'Z']).
            model_column (str, optional): CSV column identifying models (default: 'model').

        Returns:
            dict[str, pandas.DataFrame]: Dictionary of DataFrames keyed by property name.

        Raises:
            ValueError: If `data_source` not specified or `keys` missing.
            FileNotFoundError: If `data_source` doesn't exist.

        Example:
            >>> dataset = Dataset('data.h5')
            >>> data = dataset.load_data(
                    models=['model1', 'model2'],
                    keys=['BE', 'Rad'],
                    domain_keys=['Z', 'N']
                )
        """
        self.domain_keys = domain_keys

        if self.data_source is None:
            raise ValueError("Data source must be specified.")
        if not os.path.exists(self.data_source):
            raise FileNotFoundError(f"Data source '{self.data_source}' not found.")
        if keys is None:
            raise ValueError("You must specify which properties to extract via 'keys'.")

        result = {}

        for prop in keys:
            dfs = []
            skipped_models = []

            if self.data_source.endswith(".h5"):
                for model in models:
                    df = pd.read_hdf(self.data_source, key=model)
                    # Check required columns
                    missing_cols = [
                        col for col in domain_keys + [prop] if col not in df.columns
                    ]
                    if missing_cols:
                        print(
                            f"[Skipped] Model '{model}' missing columns {missing_cols} for property '{prop}'."
                        )
                        skipped_models.append(model)
                        continue
                    temp = df[domain_keys + [prop]].copy()
                    temp.rename(columns={prop: model}, inplace=True)  # type: ignore
                    dfs.append(temp)
            elif self.data_source.endswith(".csv"):
                df = pd.read_csv(self.data_source)
                for model in models:
                    if model_column not in df.columns:
                        raise ValueError(
                            f"Expected column '{model_column}' not found in CSV."
                        )
                    model_df = df[df[model_column] == model]
                    missing_cols = [
                        col
                        for col in domain_keys + [prop]
                        if col not in model_df.columns
                    ]
                    if missing_cols:
                        print(
                            f"[Skipped] Model '{model}' missing columns {missing_cols} for key '{prop}'."
                        )
                        skipped_models.append(model)
                        continue
                    temp = model_df[domain_keys + [prop]].copy()
                    temp.rename(columns={prop: model}, inplace=True)
                    dfs.append(temp)
            else:
                raise ValueError(
                    "Unsupported file format. Only .h5 and .csv are supported."
                )

            if not dfs:
                print(
                    f"[Warning] No models with property '{prop}'. Resulting DataFrame will be empty."
                )
                result[prop] = pd.DataFrame(
                    columns=domain_keys + [m for m in models if m not in skipped_models]
                )
                continue

            # Intersect domain for this property
            common_df = dfs[0]
            for other_df in dfs[1:]:
                common_df = pd.merge(common_df, other_df, on=domain_keys, how="inner")

            result[prop] = common_df
            self.data = result
        return result

    def view_data(self, property_name=None, model_name=None):
        """
        Provides flexible data viewing options.

        Args:
            property_name (str, optional): Specific property to view.
            model_name (str, optional): Specific model to view.

        Returns:
            Union[dict[str, Union[pandas.DataFrame, str]], pandas.DataFrame, pandas.Series]:
                - If no args: dict of available properties/models.
                - If only `model_name`: dict of `{property: DataFrame}`.
                - If only `property_name`: DataFrame for property.
                - If both: Series of model values for property.

        Raises:
            RuntimeError: If no data loaded.
            KeyError: If property or model not found.
        """

        if not self.data:
            raise RuntimeError("No data loaded. Run `load_data(...)` first.")

        if property_name is None and model_name is None:
            props = list(self.data.keys())
            models = sorted(
                set(
                    col
                    for prop_df in self.data.values()
                    for col in prop_df.columns
                    if col not in self.domain_keys
                )
            )

            return {"available_properties": props, "available_models": models}

        if model_name is not None and property_name is None:
            # Return a dictionary: {property: Series of model values}
            result = {}
            for prop, df in self.data.items():
                if model_name in df.columns:
                    cols = self.domain_keys + [model_name]
                    result[prop] = df[cols]
                else:
                    result[prop] = f"[Model '{model_name}' not available]"
            return result

        if property_name is not None:
            if property_name not in self.data:
                raise KeyError(f"Property '{property_name}' not found.")

            df = self.data[property_name]

            if model_name is None:
                return df  # Full property DataFrame

            if model_name not in df.columns:
                raise KeyError(
                    f"Model '{model_name}' not found in property '{property_name}'."
                )

            return df[model_name]

    def separate_points_distance_allSets(self, list1, list2, distance1, distance2):
        """
        Separates points into groups based on proximity thresholds.

        Args:
            list1 (list[tuple[float, float]]): Points to classify as (x, y) tuples.
            list2 (list[tuple[float, float]]): Reference points as (x, y) tuples.
            distance1 (float): First proximity threshold.
            distance2 (float): Second proximity threshold.

        Returns:
            tuple[list[int], list[int], list[int]]: Three lists of indices from `list1`:
                - Within `distance1` of any point in `list2`.
                - Within `distance2` but not `distance1`.
                - Beyond `distance2`.
        """
        train = []
        validation = []
        test = []

        train_list_coordinates = []
        validation_list_coordinates = []
        test_list_coordinates = []

        for i in range(len(list1)):
            point1 = list1[i]
            close = False
            for point2 in list2:
                if np.linalg.norm(np.array(point1) - np.array(point2)) <= distance1:
                    close = True
                    break
            if close:
                train.append(point1)
                train_list_coordinates.append(i)
            else:
                close2 = False
                for point2 in list2:
                    if np.linalg.norm(np.array(point1) - np.array(point2)) <= distance2:
                        close2 = True
                        break
                if close2:
                    validation.append(point1)
                    validation_list_coordinates.append(i)
                else:
                    test.append(point1)
                    test_list_coordinates.append(i)

        return (
            train_list_coordinates,
            validation_list_coordinates,
            test_list_coordinates,
        )

    def split_data(
        self, data_dict, property_name, splitting_algorithm="random", **kwargs
    ):
        """
        Splits data into training, validation, and test sets.

        Args:
            data_dict (dict[str, pandas.DataFrame]): Output from `load_data()`.
            property_name (str): Property to use for splitting.
            splitting_algorithm (str): 'random' or 'inside_to_outside'.
            **kwargs: Algorithm-specific parameters:
                - `random`: `train_size` (float), `val_size` (float), `test_size` (float).
                - `inside_to_outside`: `stable_points` (list[tuple[float, float]]), `distance1` (float), `distance2` (float).

        Returns:
            tuple[pandas.DataFrame, pandas.DataFrame, pandas.DataFrame]: (train, validation, test) DataFrames.

        Raises:
            ValueError: For invalid algorithm or missing parameters.
        """
        if property_name not in data_dict:
            raise ValueError(
                f"Property '{property_name}' not found in the provided data dictionary."
            )

        data = data_dict[property_name]

        if isinstance(data, pd.DataFrame):
            indexable_data = data.reset_index(drop=True)
            point_list = list(indexable_data.itertuples(index=False, name=None))
        else:
            raise TypeError(
                "Data for the specified property must be a pandas DataFrame."
            )

        if splitting_algorithm == "random":
            required = ["train_size", "val_size", "test_size"]
            if not all(k in kwargs for k in required):
                raise ValueError(f"Missing required kwargs for 'random': {required}")

            train_size = kwargs["train_size"]
            val_size = kwargs["val_size"]
            test_size = kwargs["test_size"]

            if not np.isclose(train_size + val_size + test_size, 1.0):
                raise ValueError("train_size + val_size + test_size must equal 1.0")

            # Random split using indexes
            train_idx, temp_idx = train_test_split(
                indexable_data.index, train_size=train_size, random_state=1
            )
            val_rel = val_size / (val_size + test_size)
            val_idx, test_idx = train_test_split(
                temp_idx, test_size=1 - val_rel, random_state=1
            )

        elif splitting_algorithm == "inside_to_outside":
            required = ["stable_points", "distance1", "distance2"]
            if not all(k in kwargs for k in required):
                raise ValueError(
                    f"Missing required kwargs for 'inside_to_outside': {required}"
                )

            stable_points = kwargs["stable_points"]
            distance1 = kwargs["distance1"]
            distance2 = kwargs["distance2"]

            (
                train_idx,
                val_idx,
                test_idx,
            ) = self.separate_points_distance_allSets(
                point_list, stable_points, distance1, distance2
            )
        else:
            raise ValueError(
                "splitting_algorithm must be either 'random' or 'inside_to_outside'"
            )

        train_data = indexable_data.iloc[train_idx]
        val_data = indexable_data.iloc[val_idx]
        test_data = indexable_data.iloc[test_idx]

        return train_data, val_data, test_data

    def get_subset(self, property_name, filters=None, models_to_include=None):
        """
        Returns a filtered subset of data for a property.

        Args:
            property_name (str): Property to filter.
            filters (dict, optional): Domain filtering rules.
            models_to_include (list[str], optional): Models to include.

        Returns:
            pandas.DataFrame: Filtered DataFrame.

        Raises:
            ValueError: If property not found.
        """
        if property_name not in self.data:
            raise ValueError(f"Property '{property_name}' not found in dataset.")

        df = self.data[property_name].copy()

        # Apply row-level filters (domain-based)
        if filters:
            for column, condition in filters.items():
                if column == "multi" and callable(condition):
                    df = df[df.apply(condition, axis=1)]
                elif callable(condition):
                    df = df[condition(df[column])]
                elif isinstance(condition, tuple) and len(condition) == 2:
                    df = df[(df[column] >= condition[0]) & (df[column] <= condition[1])]
                elif isinstance(condition, list):
                    df = df[df[column].isin(condition)]
                else:
                    df = df[df[column] == condition]

        # Optionally restrict to a subset of models
        if models_to_include is not None:
            domain_keys = [col for col in ["N", "Z"] if col in df.columns]
            allowed_cols = domain_keys + [
                m for m in models_to_include if m in df.columns
            ]
            df = df[allowed_cols]

        return df

__init__(data_source=None)

Initializes the Dataset instance.

Parameters:

Name Type Description Default
data_source str

Path to data file (.h5 or .csv).

None
Source code in pybmc/data.py
def __init__(self, data_source=None):
    """
    Initializes the Dataset instance.

    Args:
        data_source (str, optional): Path to data file (.h5 or .csv).
    """
    self.data_source = data_source
    self.data = {}  # Dictionary of model to DataFrame
    self.domain_keys = ["X1", "X2"]  # Default domain keys

get_subset(property_name, filters=None, models_to_include=None)

Returns a filtered subset of data for a property.

Parameters:

Name Type Description Default
property_name str

Property to filter.

required
filters dict

Domain filtering rules.

None
models_to_include list[str]

Models to include.

None

Returns:

Type Description

pandas.DataFrame: Filtered DataFrame.

Raises:

Type Description
ValueError

If property not found.

Source code in pybmc/data.py
def get_subset(self, property_name, filters=None, models_to_include=None):
    """
    Returns a filtered subset of data for a property.

    Args:
        property_name (str): Property to filter.
        filters (dict, optional): Domain filtering rules.
        models_to_include (list[str], optional): Models to include.

    Returns:
        pandas.DataFrame: Filtered DataFrame.

    Raises:
        ValueError: If property not found.
    """
    if property_name not in self.data:
        raise ValueError(f"Property '{property_name}' not found in dataset.")

    df = self.data[property_name].copy()

    # Apply row-level filters (domain-based)
    if filters:
        for column, condition in filters.items():
            if column == "multi" and callable(condition):
                df = df[df.apply(condition, axis=1)]
            elif callable(condition):
                df = df[condition(df[column])]
            elif isinstance(condition, tuple) and len(condition) == 2:
                df = df[(df[column] >= condition[0]) & (df[column] <= condition[1])]
            elif isinstance(condition, list):
                df = df[df[column].isin(condition)]
            else:
                df = df[df[column] == condition]

    # Optionally restrict to a subset of models
    if models_to_include is not None:
        domain_keys = [col for col in ["N", "Z"] if col in df.columns]
        allowed_cols = domain_keys + [
            m for m in models_to_include if m in df.columns
        ]
        df = df[allowed_cols]

    return df

load_data(models, keys=None, domain_keys=None, model_column='model')

Loads data for multiple properties and models.

Parameters:

Name Type Description Default
models list[str]

Model names to load.

required
keys list[str]

Property names to extract.

None
domain_keys list[str]

Domain columns (default: ['N', 'Z']).

None
model_column str

CSV column identifying models (default: 'model').

'model'

Returns:

Type Description

dict[str, pandas.DataFrame]: Dictionary of DataFrames keyed by property name.

Raises:

Type Description
ValueError

If data_source not specified or keys missing.

FileNotFoundError

If data_source doesn't exist.

Example

dataset = Dataset('data.h5') data = dataset.load_data( models=['model1', 'model2'], keys=['BE', 'Rad'], domain_keys=['Z', 'N'] )

Source code in pybmc/data.py
def load_data(self, models, keys=None, domain_keys=None, model_column="model"):
    """
    Loads data for multiple properties and models.

    Args:
        models (list[str]): Model names to load.
        keys (list[str]): Property names to extract.
        domain_keys (list[str], optional): Domain columns (default: ['N', 'Z']).
        model_column (str, optional): CSV column identifying models (default: 'model').

    Returns:
        dict[str, pandas.DataFrame]: Dictionary of DataFrames keyed by property name.

    Raises:
        ValueError: If `data_source` not specified or `keys` missing.
        FileNotFoundError: If `data_source` doesn't exist.

    Example:
        >>> dataset = Dataset('data.h5')
        >>> data = dataset.load_data(
                models=['model1', 'model2'],
                keys=['BE', 'Rad'],
                domain_keys=['Z', 'N']
            )
    """
    self.domain_keys = domain_keys

    if self.data_source is None:
        raise ValueError("Data source must be specified.")
    if not os.path.exists(self.data_source):
        raise FileNotFoundError(f"Data source '{self.data_source}' not found.")
    if keys is None:
        raise ValueError("You must specify which properties to extract via 'keys'.")

    result = {}

    for prop in keys:
        dfs = []
        skipped_models = []

        if self.data_source.endswith(".h5"):
            for model in models:
                df = pd.read_hdf(self.data_source, key=model)
                # Check required columns
                missing_cols = [
                    col for col in domain_keys + [prop] if col not in df.columns
                ]
                if missing_cols:
                    print(
                        f"[Skipped] Model '{model}' missing columns {missing_cols} for property '{prop}'."
                    )
                    skipped_models.append(model)
                    continue
                temp = df[domain_keys + [prop]].copy()
                temp.rename(columns={prop: model}, inplace=True)  # type: ignore
                dfs.append(temp)
        elif self.data_source.endswith(".csv"):
            df = pd.read_csv(self.data_source)
            for model in models:
                if model_column not in df.columns:
                    raise ValueError(
                        f"Expected column '{model_column}' not found in CSV."
                    )
                model_df = df[df[model_column] == model]
                missing_cols = [
                    col
                    for col in domain_keys + [prop]
                    if col not in model_df.columns
                ]
                if missing_cols:
                    print(
                        f"[Skipped] Model '{model}' missing columns {missing_cols} for key '{prop}'."
                    )
                    skipped_models.append(model)
                    continue
                temp = model_df[domain_keys + [prop]].copy()
                temp.rename(columns={prop: model}, inplace=True)
                dfs.append(temp)
        else:
            raise ValueError(
                "Unsupported file format. Only .h5 and .csv are supported."
            )

        if not dfs:
            print(
                f"[Warning] No models with property '{prop}'. Resulting DataFrame will be empty."
            )
            result[prop] = pd.DataFrame(
                columns=domain_keys + [m for m in models if m not in skipped_models]
            )
            continue

        # Intersect domain for this property
        common_df = dfs[0]
        for other_df in dfs[1:]:
            common_df = pd.merge(common_df, other_df, on=domain_keys, how="inner")

        result[prop] = common_df
        self.data = result
    return result

separate_points_distance_allSets(list1, list2, distance1, distance2)

Separates points into groups based on proximity thresholds.

Parameters:

Name Type Description Default
list1 list[tuple[float, float]]

Points to classify as (x, y) tuples.

required
list2 list[tuple[float, float]]

Reference points as (x, y) tuples.

required
distance1 float

First proximity threshold.

required
distance2 float

Second proximity threshold.

required

Returns:

Type Description

tuple[list[int], list[int], list[int]]: Three lists of indices from list1: - Within distance1 of any point in list2. - Within distance2 but not distance1. - Beyond distance2.

Source code in pybmc/data.py
def separate_points_distance_allSets(self, list1, list2, distance1, distance2):
    """
    Separates points into groups based on proximity thresholds.

    Args:
        list1 (list[tuple[float, float]]): Points to classify as (x, y) tuples.
        list2 (list[tuple[float, float]]): Reference points as (x, y) tuples.
        distance1 (float): First proximity threshold.
        distance2 (float): Second proximity threshold.

    Returns:
        tuple[list[int], list[int], list[int]]: Three lists of indices from `list1`:
            - Within `distance1` of any point in `list2`.
            - Within `distance2` but not `distance1`.
            - Beyond `distance2`.
    """
    train = []
    validation = []
    test = []

    train_list_coordinates = []
    validation_list_coordinates = []
    test_list_coordinates = []

    for i in range(len(list1)):
        point1 = list1[i]
        close = False
        for point2 in list2:
            if np.linalg.norm(np.array(point1) - np.array(point2)) <= distance1:
                close = True
                break
        if close:
            train.append(point1)
            train_list_coordinates.append(i)
        else:
            close2 = False
            for point2 in list2:
                if np.linalg.norm(np.array(point1) - np.array(point2)) <= distance2:
                    close2 = True
                    break
            if close2:
                validation.append(point1)
                validation_list_coordinates.append(i)
            else:
                test.append(point1)
                test_list_coordinates.append(i)

    return (
        train_list_coordinates,
        validation_list_coordinates,
        test_list_coordinates,
    )

split_data(data_dict, property_name, splitting_algorithm='random', **kwargs)

Splits data into training, validation, and test sets.

Parameters:

Name Type Description Default
data_dict dict[str, DataFrame]

Output from load_data().

required
property_name str

Property to use for splitting.

required
splitting_algorithm str

'random' or 'inside_to_outside'.

'random'
**kwargs

Algorithm-specific parameters: - random: train_size (float), val_size (float), test_size (float). - inside_to_outside: stable_points (list[tuple[float, float]]), distance1 (float), distance2 (float).

{}

Returns:

Type Description

tuple[pandas.DataFrame, pandas.DataFrame, pandas.DataFrame]: (train, validation, test) DataFrames.

Raises:

Type Description
ValueError

For invalid algorithm or missing parameters.

Source code in pybmc/data.py
def split_data(
    self, data_dict, property_name, splitting_algorithm="random", **kwargs
):
    """
    Splits data into training, validation, and test sets.

    Args:
        data_dict (dict[str, pandas.DataFrame]): Output from `load_data()`.
        property_name (str): Property to use for splitting.
        splitting_algorithm (str): 'random' or 'inside_to_outside'.
        **kwargs: Algorithm-specific parameters:
            - `random`: `train_size` (float), `val_size` (float), `test_size` (float).
            - `inside_to_outside`: `stable_points` (list[tuple[float, float]]), `distance1` (float), `distance2` (float).

    Returns:
        tuple[pandas.DataFrame, pandas.DataFrame, pandas.DataFrame]: (train, validation, test) DataFrames.

    Raises:
        ValueError: For invalid algorithm or missing parameters.
    """
    if property_name not in data_dict:
        raise ValueError(
            f"Property '{property_name}' not found in the provided data dictionary."
        )

    data = data_dict[property_name]

    if isinstance(data, pd.DataFrame):
        indexable_data = data.reset_index(drop=True)
        point_list = list(indexable_data.itertuples(index=False, name=None))
    else:
        raise TypeError(
            "Data for the specified property must be a pandas DataFrame."
        )

    if splitting_algorithm == "random":
        required = ["train_size", "val_size", "test_size"]
        if not all(k in kwargs for k in required):
            raise ValueError(f"Missing required kwargs for 'random': {required}")

        train_size = kwargs["train_size"]
        val_size = kwargs["val_size"]
        test_size = kwargs["test_size"]

        if not np.isclose(train_size + val_size + test_size, 1.0):
            raise ValueError("train_size + val_size + test_size must equal 1.0")

        # Random split using indexes
        train_idx, temp_idx = train_test_split(
            indexable_data.index, train_size=train_size, random_state=1
        )
        val_rel = val_size / (val_size + test_size)
        val_idx, test_idx = train_test_split(
            temp_idx, test_size=1 - val_rel, random_state=1
        )

    elif splitting_algorithm == "inside_to_outside":
        required = ["stable_points", "distance1", "distance2"]
        if not all(k in kwargs for k in required):
            raise ValueError(
                f"Missing required kwargs for 'inside_to_outside': {required}"
            )

        stable_points = kwargs["stable_points"]
        distance1 = kwargs["distance1"]
        distance2 = kwargs["distance2"]

        (
            train_idx,
            val_idx,
            test_idx,
        ) = self.separate_points_distance_allSets(
            point_list, stable_points, distance1, distance2
        )
    else:
        raise ValueError(
            "splitting_algorithm must be either 'random' or 'inside_to_outside'"
        )

    train_data = indexable_data.iloc[train_idx]
    val_data = indexable_data.iloc[val_idx]
    test_data = indexable_data.iloc[test_idx]

    return train_data, val_data, test_data

view_data(property_name=None, model_name=None)

Provides flexible data viewing options.

Parameters:

Name Type Description Default
property_name str

Specific property to view.

None
model_name str

Specific model to view.

None

Returns:

Type Description

Union[dict[str, Union[pandas.DataFrame, str]], pandas.DataFrame, pandas.Series]: - If no args: dict of available properties/models. - If only model_name: dict of {property: DataFrame}. - If only property_name: DataFrame for property. - If both: Series of model values for property.

Raises:

Type Description
RuntimeError

If no data loaded.

KeyError

If property or model not found.

Source code in pybmc/data.py
def view_data(self, property_name=None, model_name=None):
    """
    Provides flexible data viewing options.

    Args:
        property_name (str, optional): Specific property to view.
        model_name (str, optional): Specific model to view.

    Returns:
        Union[dict[str, Union[pandas.DataFrame, str]], pandas.DataFrame, pandas.Series]:
            - If no args: dict of available properties/models.
            - If only `model_name`: dict of `{property: DataFrame}`.
            - If only `property_name`: DataFrame for property.
            - If both: Series of model values for property.

    Raises:
        RuntimeError: If no data loaded.
        KeyError: If property or model not found.
    """

    if not self.data:
        raise RuntimeError("No data loaded. Run `load_data(...)` first.")

    if property_name is None and model_name is None:
        props = list(self.data.keys())
        models = sorted(
            set(
                col
                for prop_df in self.data.values()
                for col in prop_df.columns
                if col not in self.domain_keys
            )
        )

        return {"available_properties": props, "available_models": models}

    if model_name is not None and property_name is None:
        # Return a dictionary: {property: Series of model values}
        result = {}
        for prop, df in self.data.items():
            if model_name in df.columns:
                cols = self.domain_keys + [model_name]
                result[prop] = df[cols]
            else:
                result[prop] = f"[Model '{model_name}' not available]"
        return result

    if property_name is not None:
        if property_name not in self.data:
            raise KeyError(f"Property '{property_name}' not found.")

        df = self.data[property_name]

        if model_name is None:
            return df  # Full property DataFrame

        if model_name not in df.columns:
            raise KeyError(
                f"Model '{model_name}' not found in property '{property_name}'."
            )

        return df[model_name]

BayesianModelCombination

Implements Bayesian Model Combination (BMC) for aggregating predictions from multiple models.

This class performs orthogonalization of model predictions, trains the model combination using Gibbs sampling, and provides methods for prediction and evaluation.

Parameters:

Name Type Description Default
models_list list[str]

List of model names to combine.

required
data_dict dict[str, DataFrame]

Dictionary from load_data() where keys are property names and values are DataFrames.

required
truth_column_name str

Name of the column containing ground truth values.

required
weights list[float]

Initial weights for models. Defaults to equal weights.

None

Attributes:

Name Type Description
models_list list[str]

List of model names.

data_dict dict[str, DataFrame]

Loaded data dictionary.

truth_column_name str

Ground truth column name.

weights list[float]

Current model weights.

samples ndarray

Posterior samples from Gibbs sampling.

current_property str

Current property being processed.

centered_experiment_train ndarray

Centered experimental values.

U_hat ndarray

Reduced left singular vectors from SVD.

Vt_hat ndarray

Normalized right singular vectors.

S_hat ndarray

Retained singular values.

Vt_hat_normalized ndarray

Original right singular vectors.

_predictions_mean_train ndarray

Mean predictions across models.

Example

bmc = BayesianModelCombination( models_list=["model1", "model2"], data_dict=data, truth_column_name="truth" )

Source code in pybmc/bmc.py
class BayesianModelCombination:
    """
    Implements Bayesian Model Combination (BMC) for aggregating predictions from multiple models.

    This class performs orthogonalization of model predictions, trains the model combination
    using Gibbs sampling, and provides methods for prediction and evaluation.

    Args:
        models_list (list[str]): List of model names to combine.
        data_dict (dict[str, pandas.DataFrame]): Dictionary from `load_data()` where keys are property names and values are DataFrames.
        truth_column_name (str): Name of the column containing ground truth values.
        weights (list[float], optional): Initial weights for models. Defaults to equal weights.

    Attributes:
        models_list (list[str]): List of model names.
        data_dict (dict[str, pandas.DataFrame]): Loaded data dictionary.
        truth_column_name (str): Ground truth column name.
        weights (list[float]): Current model weights.
        samples (numpy.ndarray): Posterior samples from Gibbs sampling.
        current_property (str): Current property being processed.
        centered_experiment_train (numpy.ndarray): Centered experimental values.
        U_hat (numpy.ndarray): Reduced left singular vectors from SVD.
        Vt_hat (numpy.ndarray): Normalized right singular vectors.
        S_hat (numpy.ndarray): Retained singular values.
        Vt_hat_normalized (numpy.ndarray): Original right singular vectors.
        _predictions_mean_train (numpy.ndarray): Mean predictions across models.

    Example:
        >>> bmc = BayesianModelCombination(
                models_list=["model1", "model2"],
                data_dict=data,
                truth_column_name="truth"
            )
    """

    def __init__(self, models_list, data_dict, truth_column_name, weights=None):
        """
        Initializes the BMC instance.

        Args:
            models_list (list[str]): List of model names to combine.
            data_dict (dict[str, pandas.DataFrame]): Dictionary of DataFrames from Dataset.load_data().
            truth_column_name (str): Name of column containing ground truth values.
            weights (list[float], optional): Initial model weights. Defaults to None (equal weights).

        Raises:
            ValueError: If `models_list` is not a list of strings or `data_dict` is invalid.
        """

        if not isinstance(models_list, list) or not all(
            isinstance(model, str) for model in models_list
        ):
            raise ValueError(
                "The 'models' should be a list of model names (strings) for Bayesian Combination."
            )
        if not isinstance(data_dict, dict) or not all(
            isinstance(df, pd.DataFrame) for df in data_dict.values()
        ):
            raise ValueError(
                "The 'data_dict' should be a dictionary of pandas DataFrames, one per property."
            )

        self.data_dict = data_dict
        self.models_list = models_list
        self.models = [m for m in models_list if m != "truth"]
        self.weights = weights if weights is not None else None
        self.truth_column_name = truth_column_name

    def orthogonalize(self, property, train_df, components_kept):
        """
        Performs orthogonalization of model predictions using SVD.

        This method centers model predictions, performs SVD decomposition, and retains
        the specified number of components for subsequent training.

        Args:
            property (str): Nuclear property to orthogonalize (e.g., 'BE').
            train_df (pandas.DataFrame): Training data from Dataset.split_data().
            components_kept (int): Number of SVD components to retain.

        Note:
            This method must be called before training. Results are stored in instance attributes.
        """
        # Store selected property
        self.current_property = property

        # Extract the relevant DataFrame for that property
        df = self.data_dict[property].copy()
        self.selected_models_dataset = df  # Store for train() and predict()

        # Extract model outputs (only the model columns)
        models_output_train = train_df[self.models]
        model_predictions_train = models_output_train.values

        # Mean prediction across models (per nucleus)
        predictions_mean_train = np.mean(model_predictions_train, axis=1)

        # Experimental truth values for the property
        centered_experiment_train = (
            train_df[self.truth_column_name].values - predictions_mean_train
        )

        # Center model predictions
        model_predictions_train_centered = (
            model_predictions_train - predictions_mean_train[:, None]
        )

        # Perform SVD
        U, S, Vt = np.linalg.svd(model_predictions_train_centered)

        # Dimensionality reduction
        U_hat, S_hat, Vt_hat, Vt_hat_normalized = USVt_hat_extraction(U, S, Vt, components_kept)  # type: ignore

        # Save for training
        self.centered_experiment_train = centered_experiment_train
        self.U_hat = U_hat
        self.Vt_hat = Vt_hat
        self.S_hat = S_hat
        self.Vt_hat_normalized = Vt_hat_normalized
        self._predictions_mean_train = predictions_mean_train

    def train(self, training_options=None):
        """
        Trains the model combination using Gibbs sampling.

        Args:
            training_options (dict, optional): Training configuration. Options:
                - iterations (int): Number of Gibbs iterations (default: 50000).
                - sampler (str): 'gibbs_sampling' or 'simplex' (default: 'gibbs_sampling').
                - burn (int): Burn-in iterations for simplex sampler (default: 10000).
                - stepsize (float): Proposal step size for simplex sampler (default: 0.001).
                - b_mean_prior (numpy.ndarray): Prior mean vector (default: zeros).
                - b_mean_cov (numpy.ndarray): Prior covariance matrix (default: diag(S_hat²)).
                - nu0_chosen (float): Degrees of freedom for variance prior (default: 1.0).
                - sigma20_chosen (float): Prior variance (default: 0.02).

        Note:
            Requires prior call to `orthogonalize()`. Stores posterior samples in `self.samples`.
        """

        if training_options is None:
            training_options = {}

        # functions defined so that whenever a key not specified, we print out the default value for users
        def get_option(key, default):
            if key not in training_options:
                print(f"[INFO] Using default value for '{key}': {default}")
            return training_options.get(key, default)

        iterations = get_option("iterations", 50000)
        sampler = get_option("sampler", "gibbs_sampling")
        burn = get_option("burn", 10000)
        stepsize = get_option("stepsize", 0.001)

        S_hat = self.S_hat
        num_components = self.U_hat.shape[1]

        b_mean_prior = get_option("b_mean_prior", np.zeros(num_components))
        b_mean_cov = get_option("b_mean_cov", np.diag(S_hat**2))
        nu0_chosen = get_option("nu0_chosen", 1.0)
        sigma20_chosen = get_option("sigma20_chosen", 0.02)

        if sampler == "simplex":
            self.samples = gibbs_sampler_simplex(
                self.centered_experiment_train,
                self.U_hat,
                self.Vt_hat,
                self.S_hat,
                iterations,
                [
                    nu0_chosen,
                    sigma20_chosen,
                ],  # Note: no b_mean_prior/b_mean_cov needed
                burn=burn,
                stepsize=stepsize,
            )
        else:
            self.samples = gibbs_sampler(
                self.centered_experiment_train,
                self.U_hat,
                iterations,
                [b_mean_prior, b_mean_cov, nu0_chosen, sigma20_chosen],
            )

    def predict(self, X):
        """
        Predicts values using the trained model combination with uncertainty quantification.

        Args:
            X (pandas.DataFrame): Input data containing model predictions and domain information.

        Returns:
            tuple[numpy.ndarray, pandas.DataFrame, pandas.DataFrame, pandas.DataFrame]: Contains:
                - rndm_m (numpy.ndarray): Full posterior draws (n_samples, n_points).
                - lower_df (pandas.DataFrame): Lower bounds (2.5th percentile) with domain keys.
                - median_df (pandas.DataFrame): Median predictions with domain keys.
                - upper_df (pandas.DataFrame): Upper bounds (97.5th percentile) with domain keys.

        Raises:
            ValueError: If `orthogonalize()` and `train()` haven't been called.
        """
        if self.samples is None or self.Vt_hat is None:
            raise ValueError(
                "Must call `orthogonalize()` and `train()` before predicting."
            )

        if not isinstance(X, pd.DataFrame):
            raise ValueError(
                "X must be a pandas DataFrame containing model predictions and domain info."
            )

        # Infer model columns vs. domain columns
        model_cols = self.models
        domain_keys = [col for col in X.columns if col not in model_cols]

        model_preds = X[model_cols].values
        rndm_m, (lower, median, upper) = rndm_m_random_calculator(
            model_preds, self.samples, self.Vt_hat
        )

        domain_df = X[domain_keys].reset_index(drop=True)

        lower_df = domain_df.copy()
        lower_df["Predicted_Lower"] = lower

        median_df = domain_df.copy()
        median_df["Predicted_Median"] = median

        upper_df = domain_df.copy()
        upper_df["Predicted_Upper"] = upper

        return rndm_m, lower_df, median_df, upper_df

    def predict2(self, property):
        """
        Predicts values for a specific property using the trained model combination.

        This version uses the property name instead of a DataFrame input.

        Args:
            property (str): Property name to predict (e.g., 'ChRad').

        Returns:
            tuple[numpy.ndarray, pandas.DataFrame, pandas.DataFrame, pandas.DataFrame]: Contains:
                - rndm_m (numpy.ndarray): Full posterior draws (n_samples, n_points).
                - lower_df (pandas.DataFrame): Lower bounds (2.5th percentile) with domain keys.
                - median_df (pandas.DataFrame): Median predictions with domain keys.
                - upper_df (pandas.DataFrame): Upper bounds (97.5th percentile) with domain keys.

        Raises:
            ValueError: If `orthogonalize()` and `train()` haven't been called.
            KeyError: If property not found in `data_dict`.
        """
        if self.samples is None or self.Vt_hat is None:
            raise ValueError(
                "Must call `orthogonalize()` and `train()` before predicting."
            )

        if property not in self.data_dict:
            raise KeyError(f"Property '{property}' not found in data_dict.")

        df = self.data_dict[property].copy()

        # Infer domain and model columns
        full_model_cols = self.models
        domain_keys = [
            col
            for col in df.columns
            if col not in full_model_cols and col != self.truth_column_name
        ]

        # Determine which models are present
        available_models = [m for m in df.columns if m in self.models]

        # Sets
        trained_models_set = set(self.models)
        available_models_set = set(available_models)

        missing_models = trained_models_set - available_models_set
        extra_models = available_models_set - trained_models_set
        print(f"Available models: {available_models_set}")
        print(f"Trained models: {trained_models_set}")

        if len(extra_models) > 0:
            raise ValueError(
                f"ERROR: Property '{property}' contains extra models not present during training: {list(extra_models)}. "
                "You must retrain if using a larger model space."
            )

        if len(missing_models) > 0:
            print(
                f"WARNING: Predicting on property '{property}' with missing models: {list(missing_models)}"
            )
            print(
                "         The trained model weights include these models — prediction will proceed, but results may not be statistically accurate."
            )

        if len(available_models) == 0:
            raise ValueError(
                "No available trained models are present in prediction DataFrame."
            )

        # Filter predictions and model weights
        model_preds = df[available_models].values
        domain_df = df[domain_keys].reset_index(drop=True)

        # Find indices of available models in training order
        model_indices = [self.models.index(m) for m in available_models]

        # Reduce Vt_hat and samples to only use available models
        Vt_hat_reduced = self.Vt_hat[:, model_indices]

        rndm_m, (lower, median, upper) = rndm_m_random_calculator(
            model_preds, self.samples, Vt_hat_reduced
        )

        # Build output DataFrames
        lower_df = domain_df.copy()
        lower_df["Predicted_Lower"] = lower

        median_df = domain_df.copy()
        median_df["Predicted_Median"] = median

        upper_df = domain_df.copy()
        upper_df["Predicted_Upper"] = upper

        return rndm_m, lower_df, median_df, upper_df

    def evaluate(self, domain_filter=None):
        """
        Evaluates model performance using coverage calculation.

        Args:
            domain_filter (dict, optional): Filtering rules for domain columns.
                Example: {"Z": (20, 30), "N": (20, 40)}.

        Returns:
            list[float]: Coverage percentages for each percentile in [0, 5, 10, ..., 100].
        """
        df = self.data_dict[self.current_property]

        if domain_filter:
            # Inline optimized filtering
            for col, cond in domain_filter.items():
                if col == "multi" and callable(cond):
                    df = df[df.apply(cond, axis=1)]
                elif callable(cond):
                    df = df[cond(df[col])]
                elif isinstance(cond, tuple) and len(cond) == 2:
                    df = df[df[col].between(*cond)]
                elif isinstance(cond, list):
                    df = df[df[col].isin(cond)]
                else:
                    df = df[df[col] == cond]

        preds = df[self.models].to_numpy()
        rndm_m, (lower, median, upper) = rndm_m_random_calculator(
            preds, self.samples, self.Vt_hat
        )

        return coverage(
            np.arange(0, 101, 5),
            rndm_m,
            df,
            truth_column=self.truth_column_name,
        )

__init__(models_list, data_dict, truth_column_name, weights=None)

Initializes the BMC instance.

Parameters:

Name Type Description Default
models_list list[str]

List of model names to combine.

required
data_dict dict[str, DataFrame]

Dictionary of DataFrames from Dataset.load_data().

required
truth_column_name str

Name of column containing ground truth values.

required
weights list[float]

Initial model weights. Defaults to None (equal weights).

None

Raises:

Type Description
ValueError

If models_list is not a list of strings or data_dict is invalid.

Source code in pybmc/bmc.py
def __init__(self, models_list, data_dict, truth_column_name, weights=None):
    """
    Initializes the BMC instance.

    Args:
        models_list (list[str]): List of model names to combine.
        data_dict (dict[str, pandas.DataFrame]): Dictionary of DataFrames from Dataset.load_data().
        truth_column_name (str): Name of column containing ground truth values.
        weights (list[float], optional): Initial model weights. Defaults to None (equal weights).

    Raises:
        ValueError: If `models_list` is not a list of strings or `data_dict` is invalid.
    """

    if not isinstance(models_list, list) or not all(
        isinstance(model, str) for model in models_list
    ):
        raise ValueError(
            "The 'models' should be a list of model names (strings) for Bayesian Combination."
        )
    if not isinstance(data_dict, dict) or not all(
        isinstance(df, pd.DataFrame) for df in data_dict.values()
    ):
        raise ValueError(
            "The 'data_dict' should be a dictionary of pandas DataFrames, one per property."
        )

    self.data_dict = data_dict
    self.models_list = models_list
    self.models = [m for m in models_list if m != "truth"]
    self.weights = weights if weights is not None else None
    self.truth_column_name = truth_column_name

evaluate(domain_filter=None)

Evaluates model performance using coverage calculation.

Parameters:

Name Type Description Default
domain_filter dict

Filtering rules for domain columns. Example: {"Z": (20, 30), "N": (20, 40)}.

None

Returns:

Type Description

list[float]: Coverage percentages for each percentile in [0, 5, 10, ..., 100].

Source code in pybmc/bmc.py
def evaluate(self, domain_filter=None):
    """
    Evaluates model performance using coverage calculation.

    Args:
        domain_filter (dict, optional): Filtering rules for domain columns.
            Example: {"Z": (20, 30), "N": (20, 40)}.

    Returns:
        list[float]: Coverage percentages for each percentile in [0, 5, 10, ..., 100].
    """
    df = self.data_dict[self.current_property]

    if domain_filter:
        # Inline optimized filtering
        for col, cond in domain_filter.items():
            if col == "multi" and callable(cond):
                df = df[df.apply(cond, axis=1)]
            elif callable(cond):
                df = df[cond(df[col])]
            elif isinstance(cond, tuple) and len(cond) == 2:
                df = df[df[col].between(*cond)]
            elif isinstance(cond, list):
                df = df[df[col].isin(cond)]
            else:
                df = df[df[col] == cond]

    preds = df[self.models].to_numpy()
    rndm_m, (lower, median, upper) = rndm_m_random_calculator(
        preds, self.samples, self.Vt_hat
    )

    return coverage(
        np.arange(0, 101, 5),
        rndm_m,
        df,
        truth_column=self.truth_column_name,
    )

orthogonalize(property, train_df, components_kept)

Performs orthogonalization of model predictions using SVD.

This method centers model predictions, performs SVD decomposition, and retains the specified number of components for subsequent training.

Parameters:

Name Type Description Default
property str

Nuclear property to orthogonalize (e.g., 'BE').

required
train_df DataFrame

Training data from Dataset.split_data().

required
components_kept int

Number of SVD components to retain.

required
Note

This method must be called before training. Results are stored in instance attributes.

Source code in pybmc/bmc.py
def orthogonalize(self, property, train_df, components_kept):
    """
    Performs orthogonalization of model predictions using SVD.

    This method centers model predictions, performs SVD decomposition, and retains
    the specified number of components for subsequent training.

    Args:
        property (str): Nuclear property to orthogonalize (e.g., 'BE').
        train_df (pandas.DataFrame): Training data from Dataset.split_data().
        components_kept (int): Number of SVD components to retain.

    Note:
        This method must be called before training. Results are stored in instance attributes.
    """
    # Store selected property
    self.current_property = property

    # Extract the relevant DataFrame for that property
    df = self.data_dict[property].copy()
    self.selected_models_dataset = df  # Store for train() and predict()

    # Extract model outputs (only the model columns)
    models_output_train = train_df[self.models]
    model_predictions_train = models_output_train.values

    # Mean prediction across models (per nucleus)
    predictions_mean_train = np.mean(model_predictions_train, axis=1)

    # Experimental truth values for the property
    centered_experiment_train = (
        train_df[self.truth_column_name].values - predictions_mean_train
    )

    # Center model predictions
    model_predictions_train_centered = (
        model_predictions_train - predictions_mean_train[:, None]
    )

    # Perform SVD
    U, S, Vt = np.linalg.svd(model_predictions_train_centered)

    # Dimensionality reduction
    U_hat, S_hat, Vt_hat, Vt_hat_normalized = USVt_hat_extraction(U, S, Vt, components_kept)  # type: ignore

    # Save for training
    self.centered_experiment_train = centered_experiment_train
    self.U_hat = U_hat
    self.Vt_hat = Vt_hat
    self.S_hat = S_hat
    self.Vt_hat_normalized = Vt_hat_normalized
    self._predictions_mean_train = predictions_mean_train

predict(X)

Predicts values using the trained model combination with uncertainty quantification.

Parameters:

Name Type Description Default
X DataFrame

Input data containing model predictions and domain information.

required

Returns:

Type Description

tuple[numpy.ndarray, pandas.DataFrame, pandas.DataFrame, pandas.DataFrame]: Contains: - rndm_m (numpy.ndarray): Full posterior draws (n_samples, n_points). - lower_df (pandas.DataFrame): Lower bounds (2.5th percentile) with domain keys. - median_df (pandas.DataFrame): Median predictions with domain keys. - upper_df (pandas.DataFrame): Upper bounds (97.5th percentile) with domain keys.

Raises:

Type Description
ValueError

If orthogonalize() and train() haven't been called.

Source code in pybmc/bmc.py
def predict(self, X):
    """
    Predicts values using the trained model combination with uncertainty quantification.

    Args:
        X (pandas.DataFrame): Input data containing model predictions and domain information.

    Returns:
        tuple[numpy.ndarray, pandas.DataFrame, pandas.DataFrame, pandas.DataFrame]: Contains:
            - rndm_m (numpy.ndarray): Full posterior draws (n_samples, n_points).
            - lower_df (pandas.DataFrame): Lower bounds (2.5th percentile) with domain keys.
            - median_df (pandas.DataFrame): Median predictions with domain keys.
            - upper_df (pandas.DataFrame): Upper bounds (97.5th percentile) with domain keys.

    Raises:
        ValueError: If `orthogonalize()` and `train()` haven't been called.
    """
    if self.samples is None or self.Vt_hat is None:
        raise ValueError(
            "Must call `orthogonalize()` and `train()` before predicting."
        )

    if not isinstance(X, pd.DataFrame):
        raise ValueError(
            "X must be a pandas DataFrame containing model predictions and domain info."
        )

    # Infer model columns vs. domain columns
    model_cols = self.models
    domain_keys = [col for col in X.columns if col not in model_cols]

    model_preds = X[model_cols].values
    rndm_m, (lower, median, upper) = rndm_m_random_calculator(
        model_preds, self.samples, self.Vt_hat
    )

    domain_df = X[domain_keys].reset_index(drop=True)

    lower_df = domain_df.copy()
    lower_df["Predicted_Lower"] = lower

    median_df = domain_df.copy()
    median_df["Predicted_Median"] = median

    upper_df = domain_df.copy()
    upper_df["Predicted_Upper"] = upper

    return rndm_m, lower_df, median_df, upper_df

predict2(property)

Predicts values for a specific property using the trained model combination.

This version uses the property name instead of a DataFrame input.

Parameters:

Name Type Description Default
property str

Property name to predict (e.g., 'ChRad').

required

Returns:

Type Description

tuple[numpy.ndarray, pandas.DataFrame, pandas.DataFrame, pandas.DataFrame]: Contains: - rndm_m (numpy.ndarray): Full posterior draws (n_samples, n_points). - lower_df (pandas.DataFrame): Lower bounds (2.5th percentile) with domain keys. - median_df (pandas.DataFrame): Median predictions with domain keys. - upper_df (pandas.DataFrame): Upper bounds (97.5th percentile) with domain keys.

Raises:

Type Description
ValueError

If orthogonalize() and train() haven't been called.

KeyError

If property not found in data_dict.

Source code in pybmc/bmc.py
def predict2(self, property):
    """
    Predicts values for a specific property using the trained model combination.

    This version uses the property name instead of a DataFrame input.

    Args:
        property (str): Property name to predict (e.g., 'ChRad').

    Returns:
        tuple[numpy.ndarray, pandas.DataFrame, pandas.DataFrame, pandas.DataFrame]: Contains:
            - rndm_m (numpy.ndarray): Full posterior draws (n_samples, n_points).
            - lower_df (pandas.DataFrame): Lower bounds (2.5th percentile) with domain keys.
            - median_df (pandas.DataFrame): Median predictions with domain keys.
            - upper_df (pandas.DataFrame): Upper bounds (97.5th percentile) with domain keys.

    Raises:
        ValueError: If `orthogonalize()` and `train()` haven't been called.
        KeyError: If property not found in `data_dict`.
    """
    if self.samples is None or self.Vt_hat is None:
        raise ValueError(
            "Must call `orthogonalize()` and `train()` before predicting."
        )

    if property not in self.data_dict:
        raise KeyError(f"Property '{property}' not found in data_dict.")

    df = self.data_dict[property].copy()

    # Infer domain and model columns
    full_model_cols = self.models
    domain_keys = [
        col
        for col in df.columns
        if col not in full_model_cols and col != self.truth_column_name
    ]

    # Determine which models are present
    available_models = [m for m in df.columns if m in self.models]

    # Sets
    trained_models_set = set(self.models)
    available_models_set = set(available_models)

    missing_models = trained_models_set - available_models_set
    extra_models = available_models_set - trained_models_set
    print(f"Available models: {available_models_set}")
    print(f"Trained models: {trained_models_set}")

    if len(extra_models) > 0:
        raise ValueError(
            f"ERROR: Property '{property}' contains extra models not present during training: {list(extra_models)}. "
            "You must retrain if using a larger model space."
        )

    if len(missing_models) > 0:
        print(
            f"WARNING: Predicting on property '{property}' with missing models: {list(missing_models)}"
        )
        print(
            "         The trained model weights include these models — prediction will proceed, but results may not be statistically accurate."
        )

    if len(available_models) == 0:
        raise ValueError(
            "No available trained models are present in prediction DataFrame."
        )

    # Filter predictions and model weights
    model_preds = df[available_models].values
    domain_df = df[domain_keys].reset_index(drop=True)

    # Find indices of available models in training order
    model_indices = [self.models.index(m) for m in available_models]

    # Reduce Vt_hat and samples to only use available models
    Vt_hat_reduced = self.Vt_hat[:, model_indices]

    rndm_m, (lower, median, upper) = rndm_m_random_calculator(
        model_preds, self.samples, Vt_hat_reduced
    )

    # Build output DataFrames
    lower_df = domain_df.copy()
    lower_df["Predicted_Lower"] = lower

    median_df = domain_df.copy()
    median_df["Predicted_Median"] = median

    upper_df = domain_df.copy()
    upper_df["Predicted_Upper"] = upper

    return rndm_m, lower_df, median_df, upper_df

train(training_options=None)

Trains the model combination using Gibbs sampling.

Parameters:

Name Type Description Default
training_options dict

Training configuration. Options: - iterations (int): Number of Gibbs iterations (default: 50000). - sampler (str): 'gibbs_sampling' or 'simplex' (default: 'gibbs_sampling'). - burn (int): Burn-in iterations for simplex sampler (default: 10000). - stepsize (float): Proposal step size for simplex sampler (default: 0.001). - b_mean_prior (numpy.ndarray): Prior mean vector (default: zeros). - b_mean_cov (numpy.ndarray): Prior covariance matrix (default: diag(S_hat²)). - nu0_chosen (float): Degrees of freedom for variance prior (default: 1.0). - sigma20_chosen (float): Prior variance (default: 0.02).

None
Note

Requires prior call to orthogonalize(). Stores posterior samples in self.samples.

Source code in pybmc/bmc.py
def train(self, training_options=None):
    """
    Trains the model combination using Gibbs sampling.

    Args:
        training_options (dict, optional): Training configuration. Options:
            - iterations (int): Number of Gibbs iterations (default: 50000).
            - sampler (str): 'gibbs_sampling' or 'simplex' (default: 'gibbs_sampling').
            - burn (int): Burn-in iterations for simplex sampler (default: 10000).
            - stepsize (float): Proposal step size for simplex sampler (default: 0.001).
            - b_mean_prior (numpy.ndarray): Prior mean vector (default: zeros).
            - b_mean_cov (numpy.ndarray): Prior covariance matrix (default: diag(S_hat²)).
            - nu0_chosen (float): Degrees of freedom for variance prior (default: 1.0).
            - sigma20_chosen (float): Prior variance (default: 0.02).

    Note:
        Requires prior call to `orthogonalize()`. Stores posterior samples in `self.samples`.
    """

    if training_options is None:
        training_options = {}

    # functions defined so that whenever a key not specified, we print out the default value for users
    def get_option(key, default):
        if key not in training_options:
            print(f"[INFO] Using default value for '{key}': {default}")
        return training_options.get(key, default)

    iterations = get_option("iterations", 50000)
    sampler = get_option("sampler", "gibbs_sampling")
    burn = get_option("burn", 10000)
    stepsize = get_option("stepsize", 0.001)

    S_hat = self.S_hat
    num_components = self.U_hat.shape[1]

    b_mean_prior = get_option("b_mean_prior", np.zeros(num_components))
    b_mean_cov = get_option("b_mean_cov", np.diag(S_hat**2))
    nu0_chosen = get_option("nu0_chosen", 1.0)
    sigma20_chosen = get_option("sigma20_chosen", 0.02)

    if sampler == "simplex":
        self.samples = gibbs_sampler_simplex(
            self.centered_experiment_train,
            self.U_hat,
            self.Vt_hat,
            self.S_hat,
            iterations,
            [
                nu0_chosen,
                sigma20_chosen,
            ],  # Note: no b_mean_prior/b_mean_cov needed
            burn=burn,
            stepsize=stepsize,
        )
    else:
        self.samples = gibbs_sampler(
            self.centered_experiment_train,
            self.U_hat,
            iterations,
            [b_mean_prior, b_mean_cov, nu0_chosen, sigma20_chosen],
        )

USVt_hat_extraction(U, S, Vt, components_kept)

Extracts reduced-dimensionality matrices from SVD results.

Parameters:

Name Type Description Default
U ndarray

Left singular vectors.

required
S ndarray

Singular values.

required
Vt ndarray

Right singular vectors (transposed).

required
components_kept int

Number of components to retain.

required

Returns:

Type Description

tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]: - U_hat (numpy.ndarray): Reduced left singular vectors. - S_hat (numpy.ndarray): Retained singular values. - Vt_hat (numpy.ndarray): Normalized right singular vectors. - Vt_hat_normalized (numpy.ndarray): Original right singular vectors.

Source code in pybmc/inference_utils.py
def USVt_hat_extraction(U, S, Vt, components_kept):
    """
    Extracts reduced-dimensionality matrices from SVD results.

    Args:
        U (numpy.ndarray): Left singular vectors.
        S (numpy.ndarray): Singular values.
        Vt (numpy.ndarray): Right singular vectors (transposed).
        components_kept (int): Number of components to retain.

    Returns:
        tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]:
            - `U_hat` (numpy.ndarray): Reduced left singular vectors.
            - `S_hat` (numpy.ndarray): Retained singular values.
            - `Vt_hat` (numpy.ndarray): Normalized right singular vectors.
            - `Vt_hat_normalized` (numpy.ndarray): Original right singular vectors.
    """
    U_hat = np.array([U.T[i] for i in range(components_kept)]).T
    S_hat = S[:components_kept]
    Vt_hat = np.array([Vt[i] / S[i] for i in range(components_kept)])
    Vt_hat_normalized = np.array([Vt[i] for i in range(components_kept)])
    return U_hat, S_hat, Vt_hat, Vt_hat_normalized

gibbs_sampler(y, X, iterations, prior_info)

Performs Gibbs sampling for Bayesian linear regression.

Parameters:

Name Type Description Default
y ndarray

Response vector (centered).

required
X ndarray

Design matrix.

required
iterations int

Number of sampling iterations.

required
prior_info tuple[ndarray, ndarray, float, float]

Prior parameters: - b_mean_prior (numpy.ndarray): Prior mean for coefficients. - b_mean_cov (numpy.ndarray): Prior covariance matrix. - nu0 (float): Prior degrees of freedom for variance. - sigma20 (float): Prior variance.

required

Returns:

Type Description

numpy.ndarray: Posterior samples [beta, sigma].

Source code in pybmc/inference_utils.py
def gibbs_sampler(y, X, iterations, prior_info):
    """
    Performs Gibbs sampling for Bayesian linear regression.

    Args:
        y (numpy.ndarray): Response vector (centered).
        X (numpy.ndarray): Design matrix.
        iterations (int): Number of sampling iterations.
        prior_info (tuple[numpy.ndarray, numpy.ndarray, float, float]): Prior parameters:
            - `b_mean_prior` (numpy.ndarray): Prior mean for coefficients.
            - `b_mean_cov` (numpy.ndarray): Prior covariance matrix.
            - `nu0` (float): Prior degrees of freedom for variance.
            - `sigma20` (float): Prior variance.

    Returns:
        numpy.ndarray: Posterior samples `[beta, sigma]`.
    """
    b_mean_prior, b_mean_cov, nu0, sigma20 = prior_info
    b_mean_cov_inv = np.linalg.inv(b_mean_cov)
    n = len(y)

    X_T_X = X.T.dot(X)
    X_T_X_inv = np.linalg.inv(X_T_X)

    b_data = X_T_X_inv.dot(X.T).dot(y)
    supermodel = X.dot(b_data)
    residuals = y - supermodel
    sigma2 = np.sum(residuals**2) / len(residuals)
    cov_matrix = sigma2 * X_T_X_inv

    samples = []

    # Initialize sigma2 with a small positive value to avoid division by zero
    sigma2 = max(sigma2, 1e-6)

    for i in range(iterations):
        # Regularize the covariance matrix to ensure it is positive definite
        cov_matrix = np.linalg.inv(X_T_X / sigma2 + b_mean_cov_inv + np.eye(X_T_X.shape[0]) * 1e-6)
        mean_vector = cov_matrix.dot(
            b_mean_cov_inv.dot(b_mean_prior) + X.T.dot(y) / sigma2
        )
        b_current = np.random.multivariate_normal(mean_vector, cov_matrix)

        # Sample from the conditional posterior of sigma2 given bs and data
        supermodel = X.dot(b_current)
        residuals = y - supermodel
        shape_post = (nu0 + n) / 2.0
        scale_post = (nu0 * sigma20 + np.sum(residuals**2)) / 2.0
        sigma2 = max(1 / np.random.default_rng().gamma(shape_post, 1 / scale_post), 1e-6)

        samples.append(np.append(b_current, np.sqrt(sigma2)))

    return np.array(samples)

gibbs_sampler_simplex(y, X, Vt_hat, S_hat, iterations, prior_info, burn=10000, stepsize=0.001)

Performs Gibbs sampling with simplex constraints on model weights.

Parameters:

Name Type Description Default
y ndarray

Centered response vector.

required
X ndarray

Design matrix of principal components.

required
Vt_hat ndarray

Normalized right singular vectors.

required
S_hat ndarray

Singular values.

required
iterations int

Number of sampling iterations.

required
prior_info list[float]

[nu0, sigma20] - prior parameters for variance.

required
burn int

Burn-in iterations (default: 10000).

10000
stepsize float

Proposal step size (default: 0.001).

0.001

Returns:

Type Description

numpy.ndarray: Posterior samples [beta, sigma].

Source code in pybmc/inference_utils.py
def gibbs_sampler_simplex(
    y, X, Vt_hat, S_hat, iterations, prior_info, burn=10000, stepsize=0.001
):
    """
    Performs Gibbs sampling with simplex constraints on model weights.

    Args:
        y (numpy.ndarray): Centered response vector.
        X (numpy.ndarray): Design matrix of principal components.
        Vt_hat (numpy.ndarray): Normalized right singular vectors.
        S_hat (numpy.ndarray): Singular values.
        iterations (int): Number of sampling iterations.
        prior_info (list[float]): `[nu0, sigma20]` - prior parameters for variance.
        burn (int, optional): Burn-in iterations (default: 10000).
        stepsize (float, optional): Proposal step size (default: 0.001).

    Returns:
        numpy.ndarray: Posterior samples `[beta, sigma]`.
    """
    bias0 = np.full(len(Vt_hat.T), 1 / len(Vt_hat.T))
    nu0, sigma20 = prior_info
    cov_matrix_step = np.diag(S_hat**2 * stepsize**2)
    n = len(y)
    b_current = np.full(len(X.T), 0)
    supermodel_current = X.dot(b_current)
    residuals_current = y - supermodel_current
    log_likelihood_current = -np.sum(residuals_current**2)
    sigma2 = -log_likelihood_current / len(residuals_current)
    samples = []
    acceptance = 0

    # Validate inputs
    if burn < 0:
        raise ValueError("Burn-in iterations must be non-negative.")
    if stepsize <= 0:
        raise ValueError("Stepsize must be positive.")

    # Burn-in phase
    for i in range(burn):
        b_proposed = np.random.multivariate_normal(b_current, cov_matrix_step)
        omegas_proposed = np.dot(b_proposed, Vt_hat) + bias0

        # Skip proposals with negative weights
        if not np.any(omegas_proposed < 0):
            supermodel_proposed = X.dot(b_proposed)
            residuals_proposed = y - supermodel_proposed
            log_likelihood_proposed = -np.sum(residuals_proposed**2)
            acceptance_prob = min(
                1,
                np.exp((log_likelihood_proposed - log_likelihood_current) / sigma2),
            )
            if np.random.uniform() < acceptance_prob:
                b_current = np.copy(b_proposed)
                log_likelihood_current = log_likelihood_proposed

        # Sample variance
        shape_post = (nu0 + n) / 2.0
        scale_post = (nu0 * sigma20 - log_likelihood_current) / 2.0
        sigma2 = 1 / np.random.default_rng().gamma(shape_post, 1 / scale_post)

    # Sampling phase
    for i in range(iterations):
        b_proposed = np.random.multivariate_normal(b_current, cov_matrix_step)
        omegas_proposed = np.dot(b_proposed, Vt_hat) + bias0

        if not np.any(omegas_proposed < 0):
            supermodel_proposed = X.dot(b_proposed)
            residuals_proposed = y - supermodel_proposed
            log_likelihood_proposed = -np.sum(residuals_proposed**2)
            acceptance_prob = min(
                1,
                np.exp((log_likelihood_proposed - log_likelihood_current) / sigma2),
            )
            if np.random.uniform() < acceptance_prob:
                b_current = np.copy(b_proposed)
                log_likelihood_current = log_likelihood_proposed
                acceptance += 1

        # Sample variance
        shape_post = (nu0 + n) / 2.0
        scale_post = (nu0 * sigma20 - log_likelihood_current) / 2.0
        sigma2 = 1 / np.random.default_rng().gamma(shape_post, 1 / scale_post)
        samples.append(np.append(b_current, np.sqrt(sigma2)))

    print(f"Acceptance rate: {acceptance/iterations*100:.2f}%")
    return np.array(samples)

coverage(percentiles, rndm_m, models_output, truth_column)

Calculates coverage percentages for credible intervals.

Parameters:

Name Type Description Default
percentiles list[int]

Percentiles to evaluate (e.g., [5, 10, ..., 95]).

required
rndm_m ndarray

Posterior samples of predictions.

required
models_output DataFrame

DataFrame containing true values.

required
truth_column str

Name of column with true values.

required

Returns:

Type Description

list[float]: Coverage percentages for each percentile.

Source code in pybmc/sampling_utils.py
def coverage(percentiles, rndm_m, models_output, truth_column):
    """
    Calculates coverage percentages for credible intervals.

    Args:
        percentiles (list[int]): Percentiles to evaluate (e.g., `[5, 10, ..., 95]`).
        rndm_m (numpy.ndarray): Posterior samples of predictions.
        models_output (pandas.DataFrame): DataFrame containing true values.
        truth_column (str): Name of column with true values.

    Returns:
        list[float]: Coverage percentages for each percentile.
    """
    #  How often the model’s credible intervals actually contain the true value
    data_total = len(rndm_m.T)  # Number of data points
    M_evals = len(rndm_m)  # Number of samples
    data_true = models_output[truth_column].tolist()

    coverage_results = []

    for p in percentiles:
        count_covered = 0
        for i in range(data_total):
            # Sort model evaluations for the i-th data point
            sorted_evals = np.sort(rndm_m.T[i])
            # Find indices for lower and upper bounds of the credible interval
            lower_idx = int((0.5 - p / 200) * M_evals)
            upper_idx = int((0.5 + p / 200) * M_evals) - 1
            # Check if the true value y[i] is within this interval
            if sorted_evals[lower_idx] <= data_true[i] <= sorted_evals[upper_idx]:
                count_covered += 1
        coverage_results.append(count_covered / data_total * 100)

    return coverage_results

rndm_m_random_calculator(filtered_model_predictions, samples, Vt_hat)

Generates posterior predictive samples and credible intervals.

Parameters:

Name Type Description Default
filtered_model_predictions ndarray

Model predictions.

required
samples ndarray

Gibbs samples [beta, sigma].

required
Vt_hat ndarray

Normalized right singular vectors.

required

Returns:

Type Description

tuple[numpy.ndarray, list[numpy.ndarray]]: - rndm_m (numpy.ndarray): Posterior predictive samples. - [lower, median, upper] (list[numpy.ndarray]): Credible interval arrays.

Source code in pybmc/sampling_utils.py
def rndm_m_random_calculator(filtered_model_predictions, samples, Vt_hat):
    """
    Generates posterior predictive samples and credible intervals.

    Args:
        filtered_model_predictions (numpy.ndarray): Model predictions.
        samples (numpy.ndarray): Gibbs samples `[beta, sigma]`.
        Vt_hat (numpy.ndarray): Normalized right singular vectors.

    Returns:
        tuple[numpy.ndarray, list[numpy.ndarray]]:
            - `rndm_m` (numpy.ndarray): Posterior predictive samples.
            - `[lower, median, upper]` (list[numpy.ndarray]): Credible interval arrays.
    """
    np.random.seed(142858)
    rng = np.random.default_rng()

    theta_rand_selected = rng.choice(samples, 10000, replace=False)

    # Extract betas and noise std deviations
    betas = theta_rand_selected[:, :-1]  # shape: (10000, num_models - 1)
    noise_stds = theta_rand_selected[:, -1]  # shape: (10000,)

    # Compute model weights: shape (10000, num_models)
    default_weights = np.full(Vt_hat.shape[1], 1 / Vt_hat.shape[1])
    model_weights_random = (
        betas @ Vt_hat + default_weights
    )  # broadcasting default_weights

    # Generate noiseless predictions: shape (10000, num_data_points)
    yvals_rand_radius = (
        model_weights_random @ filtered_model_predictions.T
    )  # dot product

    # Add Gaussian noise with std = noise_stds (assume diagonal covariance)
    # We'll use broadcasting: noise_stds[:, None] * standard normal noise
    noise = rng.standard_normal(yvals_rand_radius.shape) * noise_stds[:, None]
    rndm_m = yvals_rand_radius + noise

    # Compute credible intervals
    lower_radius = np.percentile(rndm_m, 2.5, axis=0)
    median_radius = np.percentile(rndm_m, 50, axis=0)
    upper_radius = np.percentile(rndm_m, 97.5, axis=0)

    return rndm_m, [lower_radius, median_radius, upper_radius]