Source code for nustattools.stats._derate

"""Derate a covariance to accommodate unknown correlations."""

from __future__ import annotations

from typing import Any

import numpy as np
from numba import njit
from numpy.typing import ArrayLike, NDArray
from scipy.linalg import block_diag, sqrtm
from scipy.stats import chi2


@njit()  # type: ignore[misc]
def _fix(cov: NDArray[Any]) -> NDArray[Any]:
    changed = True
    while changed:
        changed = False
        for k in range(len(cov)):
            for j in range(k + 1, len(cov)):
                # pivot point k, m
                pivot = np.sign(cov[k, j])
                if not np.isfinite(pivot):
                    continue
                if np.all(np.isfinite(cov[k, :])) and np.all(np.isfinite(cov[:, j])):
                    continue
                for m in range(len(cov)):
                    if (np.isfinite(cov[k, m]) and not np.isfinite(cov[m, j])) and (
                        cov[k, m] != 0 or pivot != 0
                    ):
                        cov[j, m] = cov[m, j] = np.sign(cov[k, m] * pivot)
                        changed = True
                    elif (not np.isfinite(cov[k, m]) and np.isfinite(cov[m, j])) and (
                        cov[m, j] != 0 or pivot != 0
                    ):
                        cov[m, k] = cov[k, m] = np.sign(cov[m, j] * pivot)
                        changed = True
    return cov


def fill_max_correlation(cor: ArrayLike, target: ArrayLike) -> NDArray[Any]:
    """Fill the correlation matrix with elements to achieve maximum correlation.

    Try to match the signs in `target`.

    Only replaces elements in `cor` that are ``np.nan``.
    """

    cora = np.array(cor)
    target = np.asarray(target)

    # Check and fix connections to other elements
    cora = _fix(cora)

    priority = np.unravel_index(
        np.argsort(np.abs(target), axis=None)[::-1], target.shape
    )

    for i, j in zip(*priority):
        if np.isfinite(cora[i, j]):
            continue

        # Set the new element
        t = 1 if target[i, j] == 0 else np.sign(target[i, j])

        cora[i, j] = cora[j, i] = t

        # Check and fix connections to other elements
        cora = _fix(cora)

    return cora


def get_blocks(cov: NDArray[Any]) -> list[int]:
    """Determine the sizes of known block matrices.

    Assumes the matrix is symmetric.

    """

    blocks = []
    n = 1

    # Find blocks by looking at NaNs
    nans = np.isnan(cov)
    trans = np.any(nans[:-1] ^ nans[1:], axis=1)

    # Find blocks by looking at zeros on diagonal
    zeros = np.diag(cov) == 0
    trans |= zeros[:-1] ^ zeros[1:]

    for j in range(cov.shape[0] - 1):
        if trans[j]:
            blocks.append(n)
            n = 1
        else:
            n += 1

    # Add last block
    blocks.append(n)

    return blocks


def get_whitening_transform(cov: NDArray[Any]) -> tuple[NDArray[Any], NDArray[Any]]:
    """Get the blockwise whitening matrix and inverse."""

    blocks = get_blocks(cov)
    W_l = []
    Wi_l = []
    i = 0
    for n in blocks:
        c = cov[i : i + n, :][:, i : i + n]
        if np.all(c == 0):
            W_l.append(np.zeros_like(c))
            Wi_l.append(np.zeros_like(c))
        else:
            sc = sqrtm(c)
            W_l.append(np.linalg.pinv(sc))
            Wi_l.append(sc)
        i += n

    return np.asarray(block_diag(*W_l)), np.asarray(block_diag(*Wi_l))


[docs] def derate_covariance( cov: list[NDArray[Any]] | NDArray[Any], *, jacobian: ArrayLike | None = None, sigma: float = 3.0, accuracy: float = 0.01, return_dict: dict[str, Any] | None = None, ) -> float: """Derate the covariance of some data to account for unknown correlations. See TODO: Ref to paper. Parameters ---------- cov : numpy.ndarray or list of numpy.ndarray The covariance matrix of the data or a list of covariances that add up to the total. Unknown covariance blocks must be ``np.nan``. Off diagonal blocks may only be ``0'' or ``np.nan''. Diagonal blocks must not be ``np.nan''. jacobian : numpy.ndarray, default=None Jacobian matrix of the model prediction wrt the best-fit parameters. sigma : float, default=3. The desired confidence level up to which the derated covariance should be conservative, expressed in standard-normal standard deviations. E.g. ``sigma=3.`` corresponds to ``CL=0.997``. accuracy : float, default=0.01 The derating factor is calculated using numerical sampling. This parameter determines how many samples to throw. Lower values mean more samples. return_dict : dict, optional If specified, the nightmare covariance and thrown data samples are added to this dictionary for detailed studies outside the function. Returns ------- a : float The derating factor for the total covariance. """ # Make sure we have a list of covariances if isinstance(cov, list): covl = [np.asarray(item) for item in cov] else: covl = [np.asarray(cov)] # Assumed covariance # All unknown elements set to 0. cov_0_l = [np.nan_to_num(c) for c in covl] cov_0 = np.sum(cov_0_l, axis=0) cov_0_inv = np.linalg.inv(cov_0) # If no Jacobian is specified, assume we cover full parameter space n_data = covl[0].shape[0] if jacobian is None: jacobian = np.eye(n_data) else: jacobian = np.asarray(jacobian) # Transform to whitened coordinate systems and calculate "nightmare_cov" # covariance, then transform back nightmare_cov = np.zeros_like(cov_0) for c, c0 in zip(covl, cov_0_l): # Determine the whitening transform for each covariance W, Wi = get_whitening_transform(c) # Whitened correlation is identity matrix cor = np.eye(len(c0)) # Set unknowns back to NaN cor[np.isnan(c)] = np.nan # Assumed total covariance in whitened coordinates S = W @ cov_0 @ W.T Si = np.linalg.pinv(S) A = W @ jacobian Q = np.linalg.pinv(A.T @ Si @ A) @ A.T @ Si P = A @ Q T = Si @ P cor_nightmare = fill_max_correlation(cor, T) # Transform back to non-whitened coordinates cov_nightmare = Wi @ cor_nightmare @ Wi.T nightmare_cov = nightmare_cov + cov_nightmare # Desired significance alpha = chi2.sf(sigma**2, df=1) # Assumed critical value in parameter space n_param = jacobian.shape[1] crit_0 = chi2.isf(alpha, df=n_param) # Nightmare critical value from random throws rng = np.random.default_rng() # Matrix that solves the least squares problem # Uses assumed covariance parameter_estimator = ( np.linalg.inv(jacobian.T @ cov_0_inv @ jacobian) @ jacobian.T @ cov_0_inv ) # Assumed covariance in parameter space assumed_parameter_cov = parameter_estimator @ cov_0 @ parameter_estimator.T assumed_parameter_cov_inv = np.linalg.inv(assumed_parameter_cov) # Actual nightmare_cov covariance nightmare_parameter_cov = ( parameter_estimator @ nightmare_cov @ parameter_estimator.T ) # Estimate necessary precision # var = alpha(1-alpha) / (n f(crit_0)**2) =!= (crit_0 * rel_error)**2 n_throws = ( int( (alpha * (1.0 - alpha)) / (chi2.pdf(crit_0, df=n_param) ** 2 * (crit_0 * accuracy) ** 2) ) + 1 ) throws = rng.multivariate_normal( mean=[0.0] * n_param, cov=nightmare_parameter_cov, size=n_throws ) dist = np.einsum("ai,ij,aj->a", throws, assumed_parameter_cov_inv, throws) crit_nightmare = -np.quantile(-dist, alpha) derate = crit_nightmare / crit_0 derate = max(1.0, derate) if return_dict is not None: return_dict["nightmare_cov"] = nightmare_cov return_dict["throws"] = throws return float(derate)
__all__ = ["derate_covariance"]