"""
Generalized Hyperbolic (GH) marginal distribution :math:`f(x)`.
The marginal distribution of :math:`X` obtained by integrating out :math:`Y`:
.. math::
f(x) = \\int_0^\\infty f(x, y) \\, dy = \\int_0^\\infty f(x | y) f(y) \\, dy
where:
.. math::
X | Y \\sim N(\\mu + \\gamma Y, \\Sigma Y)
Y \\sim \\text{GIG}(p, a, b)
The marginal PDF has a closed form involving modified Bessel functions.
Note: The marginal distribution is NOT an exponential family, but the joint
distribution :math:`f(x, y)` IS an exponential family (accessible via ``.joint``).
Special cases:
- **Variance Gamma (VG)**: :math:`b \\to 0` (GIG → Gamma)
- **Normal-Inverse Gaussian (NIG)**: :math:`p = -1/2` (GIG → InverseGaussian)
- **Normal-Inverse Gamma (NInvG)**: :math:`a \\to 0` (GIG → InverseGamma)
- **Hyperbolic**: :math:`p = 1`
"""
import numpy as np
from numpy.typing import ArrayLike, NDArray
from typing import Any, Callable, Dict, Optional, Tuple, Union
from normix.base import NormalMixture, JointNormalMixture
from normix.utils import log_kv
from .joint_generalized_hyperbolic import JointGeneralizedHyperbolic
# ============================================================================
# Parameter Regularization Methods
# ============================================================================
[docs]
def regularize_det_sigma_one(
mu: NDArray,
gamma: NDArray,
sigma: NDArray,
p: float,
a: float,
b: float,
d: int,
log_det_sigma: Optional[float] = None
) -> Dict[str, Any]:
"""
Regularize GH parameters by enforcing :math:`|\\Sigma| = 1`.
This is the recommended regularization that doesn't affect convergence
of the EM algorithm. Following the legacy implementation:
.. math::
(\\mu, \\gamma, \\Sigma, p, a, b) \\to
(\\mu, |\\Sigma|^{-1/d} \\gamma, |\\Sigma|^{-1/d} \\Sigma,
p, |\\Sigma|^{-1/d} a, |\\Sigma|^{1/d} b)
Parameters
----------
mu, gamma, sigma, p, a, b : parameters
Classical GH parameters.
d : int
Dimension of X.
log_det_sigma : float, optional
Precomputed log determinant of Sigma. If provided, avoids
recomputing the determinant.
Returns
-------
params : dict
Regularized parameters with |Σ| = 1.
"""
if log_det_sigma is not None:
# Use precomputed log determinant
det_sigma = np.exp(log_det_sigma)
else:
det_sigma = np.linalg.det(sigma)
if det_sigma <= 0:
# Sigma is degenerate, can't regularize
return {'mu': mu, 'gamma': gamma, 'sigma': sigma, 'p': p, 'a': a, 'b': b}
scale = det_sigma ** (1.0 / d)
return {
'mu': mu,
'gamma': gamma / scale,
'sigma': sigma / scale,
'p': p,
'a': a / scale, # Legacy: psi = psi / scale
'b': b * scale # Legacy: chi = chi * scale
}
[docs]
def regularize_sigma_diagonal_one(
mu: NDArray,
gamma: NDArray,
sigma: NDArray,
p: float,
a: float,
b: float,
d: int
) -> Dict[str, Any]:
"""
Regularize by enforcing the first diagonal element of :math:`\\Sigma` to be 1.
.. math::
\\Sigma_{11} = 1
This is an alternative regularization useful in some applications.
Parameters
----------
mu, gamma, sigma, p, a, b : parameters
Classical GH parameters.
d : int
Dimension of X.
Returns
-------
params : dict
Regularized parameters with Σ₁₁ = 1.
"""
scale = sigma[0, 0]
if scale <= 0:
return {'mu': mu, 'gamma': gamma, 'sigma': sigma, 'p': p, 'a': a, 'b': b}
return {
'mu': mu,
'gamma': gamma / scale,
'sigma': sigma / scale,
'p': p,
'a': a / scale,
'b': b * scale
}
[docs]
def regularize_fix_p(
mu: NDArray,
gamma: NDArray,
sigma: NDArray,
p: float,
a: float,
b: float,
d: int,
p_fixed: float = -0.5
) -> Dict[str, Any]:
"""
Regularize by fixing the GIG parameter :math:`p`.
This converts GH to a special case:
- p = -0.5: Normal-Inverse Gaussian
- p > 0 with b → 0: Variance Gamma (approximately)
Parameters
----------
mu, gamma, sigma, p, a, b : parameters
Classical GH parameters.
d : int
Dimension of X.
p_fixed : float, optional
Fixed value for p. Default is -0.5 (NIG).
Returns
-------
params : dict
Regularized parameters with fixed p.
"""
return {
'mu': mu,
'gamma': gamma,
'sigma': sigma,
'p': p_fixed,
'a': a,
'b': b
}
[docs]
def regularize_none(
mu: NDArray,
gamma: NDArray,
sigma: NDArray,
p: float,
a: float,
b: float,
d: int
) -> Dict[str, Any]:
"""
No regularization (identity function).
Parameters
----------
mu, gamma, sigma, p, a, b : parameters
Classical GH parameters.
d : int
Dimension of X.
Returns
-------
params : dict
Unchanged parameters.
"""
return {'mu': mu, 'gamma': gamma, 'sigma': sigma, 'p': p, 'a': a, 'b': b}
# Dictionary of available regularization methods
REGULARIZATION_METHODS = {
'det_sigma_one': regularize_det_sigma_one,
'sigma_diagonal_one': regularize_sigma_diagonal_one,
'fix_p': regularize_fix_p,
'none': regularize_none,
}
[docs]
class GeneralizedHyperbolic(NormalMixture):
"""
Generalized Hyperbolic (GH) marginal distribution.
The Generalized Hyperbolic distribution is a normal variance-mean mixture where
the mixing distribution is Generalized Inverse Gaussian (GIG). It is the most
general form of the normal variance-mean mixture family.
The marginal distribution :math:`f(x)` is NOT an exponential family.
The joint distribution :math:`f(x, y)` IS an exponential family and can
be accessed via the ``.joint`` property.
Parameters
----------
None. Use factory methods to create instances.
Attributes
----------
_joint : JointGeneralizedHyperbolic
The underlying joint distribution.
Examples
--------
>>> # 1D case
>>> gh = GeneralizedHyperbolic.from_classical_params(
... mu=np.array([0.0]),
... gamma=np.array([0.5]),
... sigma=np.array([[1.0]]),
... p=1.0,
... a=1.0,
... b=1.0
... )
>>> x = gh.rvs(size=1000, random_state=42) # Samples X only
>>> # Access joint distribution
>>> gh.joint.pdf(x, y) # Joint PDF f(x, y)
>>> gh.joint.natural_params # Natural parameters (exponential family)
>>> # Fit with EM algorithm and regularization
>>> gh = GeneralizedHyperbolic().fit(
... X, max_iter=100,
... regularization='det_sigma_one' # or 'sigma_diagonal_one', 'fix_p', 'none'
... )
See Also
--------
JointGeneralizedHyperbolic : Joint distribution (exponential family)
GeneralizedInverseGaussian : The mixing distribution for Y
VarianceGamma : Special case with Gamma mixing (b → 0)
NormalInverseGaussian : Special case with p = -1/2
NormalInverseGamma : Special case with a → 0
Notes
-----
The GH distribution is widely used in finance for modeling asset returns.
It includes many important distributions as special cases:
- **Variance Gamma (VG)**: :math:`b \\to 0, p > 0` (Gamma mixing)
- **Normal-Inverse Gaussian (NIG)**: :math:`p = -1/2` (IG mixing)
- **Normal-Inverse Gamma (NInvG)**: :math:`a \\to 0, p < 0` (InvGamma mixing)
- **Hyperbolic**: :math:`p = 1`
- **Student-t**: :math:`p = -\\nu/2, a \\to 0, b = \\nu` gives Student-t with ν d.f.
The model is not identifiable since the parameter sets
:math:`(\\mu, \\gamma/c, \\Sigma/c, p, c \\cdot b, a/c)` give the same distribution
for any :math:`c > 0`. Use regularization to fix this.
Available regularization methods:
- ``'det_sigma_one'``: Fix :math:`|\\Sigma| = 1` (recommended)
- ``'sigma_diagonal_one'``: Fix :math:`\\Sigma_{11} = 1`
- ``'fix_p'``: Fix :math:`p` to a specific value (e.g., -0.5 for NIG)
- ``'none'``: No regularization
"""
# ========================================================================
# Joint distribution factory
# ========================================================================
def _create_joint_distribution(self) -> JointNormalMixture:
"""Create the underlying JointGeneralizedHyperbolic instance."""
return JointGeneralizedHyperbolic()
# ========================================================================
# Marginal PDF
# ========================================================================
def _marginal_logpdf(self, x: ArrayLike) -> NDArray:
"""
Compute marginal log PDF: log f(x) (vectorized with Cholesky).
The marginal PDF for GH has a closed form involving Bessel K functions:
.. math::
f(x) = C \\cdot (b + q(x))^{(p - d/2)/2} (a + \\tilde{q})^{(d/2 - p)/2}
\\cdot \\frac{K_{p - d/2}(\\sqrt{(b + q(x))(a + \\tilde{q})})}{
K_p(\\sqrt{ab})} \\cdot e^{(x-\\mu)^T\\Lambda\\gamma}
where:
- :math:`q(x) = (x-\\mu)^T \\Lambda (x-\\mu)` (Mahalanobis distance squared)
- :math:`\\tilde{q} = \\gamma^T \\Lambda \\gamma`
- :math:`C` is the normalizing constant
Parameters
----------
x : array_like
Points at which to evaluate log PDF. Shape (d,) or (n, d).
Returns
-------
logpdf : ndarray
Log PDF values.
"""
x = np.asarray(x)
mu = self._joint._mu
gamma = self._joint._gamma
classical = self.joint.classical_params
p = classical['p']
a = classical['a']
b = classical['b']
d = self.d
# Handle single point vs multiple points
if x.ndim == 1:
x = x.reshape(1, -1)
single_point = True
else:
single_point = False
n = x.shape[0]
# Get cached L_Sigma_inv and log_det_Sigma
L_inv = self._joint.L_Sigma_inv
logdet_Sigma = self._joint.log_det_Sigma
# Transform data: z = L^{-1}(x - μ)
# Mahalanobis distance: q(x) = ||z||^2 = (x-μ)^T Σ^{-1} (x-μ)
diff = x - mu # (n, d)
z = L_inv @ diff.T # (d, n)
q = np.sum(z ** 2, axis=0) # (n,)
# Transform gamma: gamma_z = L^{-1} γ
gamma_z = L_inv @ gamma # (d,)
gamma_quad = np.dot(gamma_z, gamma_z) # γ^T Σ^{-1} γ
# Linear term: (x-μ)^T Σ^{-1} γ = z.T @ gamma_z
linear = z.T @ gamma_z # (n,)
# Constants
sqrt_ab = np.sqrt(a * b)
nu = p - d / 2 # Order of Bessel function for marginal
# Log Bessel K_p(√ab) - constant for all x
log_K_p = log_kv(p, sqrt_ab)
# Vectorized log PDF computation:
# log f(x) = (p/2)(log a - log b) - (d/2) log(2π) - (1/2) log|Σ| - log K_p(√ab)
# + (ν/2) log(b + q) + (-ν/2) log(a + γ^T Λ γ)
# + log K_{p-d/2}(√((b+q)(a+γ^TΛγ))) + (x-μ)^T Λ γ
# Arguments for Bessel function
arg1 = b + q # (n,)
arg2 = a + gamma_quad # scalar
eta = np.sqrt(arg1 * arg2) # (n,)
# Log Bessel K_{p-d/2}(eta) - vectorized
log_K_nu = log_kv(nu, eta) # (n,)
# Constant part (same for all samples)
log_const = (0.5 * p * (np.log(a) - np.log(b)) -
0.5 * d * np.log(2 * np.pi) -
0.5 * logdet_Sigma - log_K_p +
0.5 * (-nu) * np.log(arg2))
# Variable part (depends on x)
logpdf = log_const + 0.5 * nu * np.log(arg1) + log_K_nu + linear
if single_point:
return float(logpdf[0])
return logpdf
# ========================================================================
# EM Algorithm with Regularization
# ========================================================================
[docs]
def fit(
self,
X: ArrayLike,
*,
max_iter: int = 100,
tol: float = 1e-2,
verbose: int = 0,
regularization: Union[str, Callable] = 'det_sigma_one',
regularization_params: Optional[Dict] = None,
random_state: Optional[int] = None,
fix_tail: bool = False,
) -> 'GeneralizedHyperbolic':
"""
Fit distribution to marginal data X using the EM algorithm.
Since the marginal distribution is not an exponential family, we use
the EM algorithm treating Y as latent. The joint distribution (X, Y)
is an exponential family, which makes the EM algorithm tractable.
Convergence is checked based on the relative change in the normal
parameters :math:`(\\mu, \\gamma, \\Sigma)`, which are more stable than
the GIG parameters. Log-likelihood is optionally displayed per
iteration when ``verbose >= 1``.
Parameters
----------
X : array_like
Observed data, shape (n_samples, d) or (n_samples,) for d=1.
max_iter : int, optional
Maximum number of EM iterations. Default is 100.
tol : float, optional
Convergence tolerance for relative parameter change (based on
:math:`\\mu, \\gamma, \\Sigma`). Default is 1e-4.
verbose : int, optional
Verbosity level. 0 = silent, 1 = progress with llh,
2 = detailed with parameter norms. Default is 0.
regularization : str or callable, optional
Regularization method to use. Options:
- ``'det_sigma_one'``: Fix |Σ| = 1 (recommended, default)
- ``'sigma_diagonal_one'``: Fix Σ₁₁ = 1
- ``'fix_p'``: Fix p to a specific value (use regularization_params)
- ``'none'``: No regularization
- Or provide a custom callable with signature:
``f(mu, gamma, sigma, p, a, b, d) -> dict``
regularization_params : dict, optional
Additional parameters for the regularization method.
For 'fix_p': ``{'p_fixed': -0.5}`` to fix p = -0.5 (NIG).
random_state : int, optional
Random seed for initialization.
fix_tail : bool, optional
If True, do not update GIG parameters (p, a, b) during fitting.
Useful for fitting special cases like VG, NIG, NInvG. Default False.
Returns
-------
self : GeneralizedHyperbolic
Fitted distribution (returns self for method chaining).
Notes
-----
The EM algorithm alternates between:
**E-step**: Compute conditional expectations given current parameters:
.. math::
E[Y^{-1}|X], E[Y|X], E[\\log Y|X]
**M-step**: Update parameters using closed-form solutions for normal
parameters and numerical optimization for GIG parameters.
After each M-step, regularization is applied to ensure identifiability.
The recommended regularization is :math:`|\\Sigma| = 1`, which doesn't
affect EM convergence (see em_algorithm.rst for proof).
The initial parameters are obtained by running a short NIG EM
(5 iterations) to get a reasonable starting point for
:math:`(\\mu, \\gamma, \\Sigma)` and the GIG parameters.
Examples
--------
>>> # Default regularization (|Σ| = 1)
>>> gh = GeneralizedHyperbolic().fit(X)
>>> # Fix p = -0.5 (NIG)
>>> gh = GeneralizedHyperbolic().fit(
... X,
... regularization='fix_p',
... regularization_params={'p_fixed': -0.5}
... )
>>> # Custom regularization
>>> def my_regularize(mu, gamma, sigma, p, a, b, d):
... return {'mu': mu, 'gamma': gamma, 'sigma': sigma,
... 'p': p, 'a': a, 'b': 1.0} # Fix b = 1
>>> gh = GeneralizedHyperbolic().fit(X, regularization=my_regularize)
"""
X = np.asarray(X)
# Handle 1D X
if X.ndim == 1:
X = X.reshape(-1, 1)
n_samples, d = X.shape
# Initialize joint distribution if needed
if self._joint is None:
self._joint = self._create_joint_distribution()
self._joint._d = d
# Get regularization function
if isinstance(regularization, str):
if regularization not in REGULARIZATION_METHODS:
raise ValueError(
f"Unknown regularization method: {regularization}. "
f"Available: {list(REGULARIZATION_METHODS.keys())}"
)
regularize_fn = REGULARIZATION_METHODS[regularization]
elif callable(regularization):
regularize_fn = regularization
else:
raise ValueError("regularization must be str or callable")
regularization_params = regularization_params or {}
# Initialize parameters using NIG warm start
if not self._joint._fitted:
self._initialize_params(X, random_state)
if verbose > 0:
init_ll = np.mean(self.logpdf(X))
print(f"Initial log-likelihood: {init_ll:.6f}")
def _apply_regularization():
"""Apply regularization to current parameters."""
classical = self.classical_params
log_det_sigma = self._joint.log_det_Sigma
if regularization == 'fix_p':
regularized = regularize_fn(
classical['mu'], classical['gamma'], classical['sigma'],
classical['p'], classical['a'], classical['b'], d,
**regularization_params
)
elif regularization == 'det_sigma_one':
regularized = regularize_fn(
classical['mu'], classical['gamma'], classical['sigma'],
classical['p'], classical['a'], classical['b'], d,
log_det_sigma=log_det_sigma
)
else:
regularized = regularize_fn(
classical['mu'], classical['gamma'], classical['sigma'],
classical['p'], classical['a'], classical['b'], d
)
# Sanity check: clamp degenerate GIG parameters
a_reg = regularized['a']
b_reg = regularized['b']
if a_reg < 1e-6 or b_reg < 1e-6 or a_reg > 1e6 or b_reg > 1e6:
regularized['a'] = np.clip(a_reg, 1e-6, 1e6)
regularized['b'] = np.clip(b_reg, 1e-6, 1e6)
self._joint.set_classical_params(**regularized)
# EM iterations
for iteration in range(max_iter):
# Apply regularization BEFORE E-step (following legacy)
_apply_regularization()
# Save regularized parameters for convergence check
classical = self.classical_params
prev_mu = classical['mu'].copy()
prev_gamma = classical['gamma'].copy()
prev_sigma = classical['sigma'].copy()
# E-step: compute conditional expectations
cond_exp = self._conditional_expectation_y_given_x(X)
# M-step: update parameters
self._m_step(X, cond_exp, fix_tail=fix_tail)
# Apply regularization AFTER M-step to ensure constraints hold
_apply_regularization()
# Check convergence using relative parameter change on (mu, gamma, Sigma)
new_classical = self.classical_params
mu_new = new_classical['mu']
gamma_new = new_classical['gamma']
sigma_new = new_classical['sigma']
mu_norm = np.linalg.norm(mu_new - prev_mu)
mu_denom = max(np.linalg.norm(prev_mu), 1e-10)
rel_mu = mu_norm / mu_denom
gamma_norm = np.linalg.norm(gamma_new - prev_gamma)
gamma_denom = max(np.linalg.norm(prev_gamma), 1e-10)
rel_gamma = gamma_norm / gamma_denom
sigma_norm = np.linalg.norm(sigma_new - prev_sigma, 'fro')
sigma_denom = max(np.linalg.norm(prev_sigma, 'fro'), 1e-10)
rel_sigma = sigma_norm / sigma_denom
max_rel_change = max(rel_mu, rel_gamma, rel_sigma)
if verbose > 0:
ll = np.mean(self.logpdf(X))
print(f"Iteration {iteration + 1}: log-likelihood = {ll:.6f}")
if verbose > 1:
print(f" rel_change: mu={rel_mu:.2e}, gamma={rel_gamma:.2e}, "
f"Sigma={rel_sigma:.2e}")
if max_rel_change < tol:
if verbose > 0:
print('Converged')
self.n_iter_ = iteration + 1
self._fitted = self._joint._fitted
self._invalidate_cache()
return self
if verbose > 0:
print('Not converged (max iterations reached)')
self.n_iter_ = max_iter
self._fitted = self._joint._fitted
self._invalidate_cache()
return self
def _initialize_params(
self,
X: NDArray,
random_state: Optional[int] = None
) -> None:
"""
Initialize GH parameters using a short NIG EM warm start.
Runs a Normal-Inverse Gaussian EM for a few iterations to obtain
reasonable initial values for :math:`(\\mu, \\gamma, \\Sigma)` and
the GIG parameters :math:`(p, a, b)`.
Parameters
----------
X : ndarray
Data array, shape (n_samples, d).
random_state : int, optional
Random seed.
"""
import warnings
from .normal_inverse_gaussian import NormalInverseGaussian
n, d = X.shape
try:
# Use NIG with a few EM iterations to get a warm start
with warnings.catch_warnings():
warnings.simplefilter("ignore")
nig = NormalInverseGaussian()
nig.fit(X, max_iter=5, verbose=0, random_state=random_state)
nig_params = nig.classical_params
mu = nig_params['mu']
gamma = nig_params['gamma']
Sigma = nig_params['sigma']
delta = nig_params['delta']
eta = nig_params['eta']
# Convert NIG (IG) params to GIG params:
# IG(δ, η) corresponds to GIG(p=-0.5, a=η/δ², b=η)
p = -0.5
a = eta / (delta ** 2)
b = eta
# Clamp GIG parameters to reasonable range
a = np.clip(a, 1e-6, 1e6)
b = np.clip(b, 1e-6, 1e6)
except Exception:
# Fallback: method of moments initialization
mu = np.mean(X, axis=0)
Sigma = np.cov(X, rowvar=False)
if d == 1:
Sigma = np.array([[Sigma]])
min_eig = np.linalg.eigvalsh(Sigma).min()
if min_eig < 1e-8:
Sigma = Sigma + (1e-8 - min_eig + 1e-8) * np.eye(d)
gamma = np.zeros(d)
p = 1.0
a = 1.0
b = 1.0
# Set initial parameters
self._joint.set_classical_params(
mu=mu, gamma=gamma, sigma=Sigma, p=p, a=a, b=b
)
def _m_step(
self,
X: NDArray,
cond_exp: Dict[str, NDArray],
fix_tail: bool = False
) -> None:
"""
M-step: update parameters given conditional expectations.
Parameters
----------
X : ndarray
Data array, shape (n_samples, d).
cond_exp : dict
Dictionary with keys 'E_Y', 'E_inv_Y', 'E_log_Y'.
fix_tail : bool, optional
If True, do not update GIG parameters (p, a, b). Default False.
"""
from normix.distributions.univariate import GeneralizedInverseGaussian
n, d = X.shape
E_Y = cond_exp['E_Y']
E_inv_Y = cond_exp['E_inv_Y']
E_log_Y = cond_exp['E_log_Y']
# Average conditional expectations (sufficient statistics for GIG)
# Following legacy naming: s1 = E[1/Y], s2 = E[Y], s3 = E[log Y]
s1 = np.mean(E_inv_Y) # E[E[1/Y|X]]
s2 = np.mean(E_Y) # E[E[Y|X]]
s3 = np.mean(E_log_Y) # E[E[log Y|X]]
# Weighted sums for normal parameters
# s4 = E[X], s5 = E[X/Y], s6 = E[XX^T/Y]
s4 = np.mean(X, axis=0) # E[X]
s5 = np.mean(X * E_inv_Y[:, np.newaxis], axis=0) # E[X/Y]
s6 = np.einsum('ij,ik,i->jk', X, X, E_inv_Y) / n # E[XX^T/Y]
# Get current GIG parameters for initialization and fallback
current = self.joint.classical_params
# Update GIG parameters if not fixed
if not fix_tail:
gig = GeneralizedInverseGaussian()
# Sufficient statistics for GIG: [E[log Y], E[1/Y], E[Y]]
gig_eta = np.array([s3, s1, s2])
# Use current GIG parameters as initial point
current_gig_theta = np.array([
current['p'] - 1,
-current['b'] / 2,
-current['a'] / 2
])
try:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
gig.set_expectation_params(gig_eta, theta0=current_gig_theta)
gig_classical = gig.classical_params
p_new = gig_classical.p
a_new = gig_classical.a
b_new = gig_classical.b
# Sanity check: reject if parameters are extreme or degenerate
# This prevents the optimization from diverging
if (abs(p_new) > 50 or a_new > 1e10 or b_new > 1e10 or
a_new < 1e-10 or b_new < 1e-10):
# Keep current parameters
p = current['p']
a = current['a']
b = current['b']
else:
p = p_new
a = a_new
b = b_new
except Exception:
# Fallback: keep current GIG parameters
p = current['p']
a = current['a']
b = current['b']
else:
# Keep current GIG parameters
p = current['p']
a = current['a']
b = current['b']
# M-step for normal parameters (closed-form, following legacy)
# mu = (s4 - s2 * s5) / (1 - s1 * s2)
# gamma = (s5 - s1 * s4) / (1 - s1 * s2)
denom = 1.0 - s1 * s2
if abs(denom) < 1e-10:
mu = s4 / s2 if s2 > 0 else s4
gamma = np.zeros(d)
else:
mu = (s4 - s2 * s5) / denom
gamma = (s5 - s1 * s4) / denom
# Sigma = -s5 μ^T - μ s5^T + s6 + s1 μ μ^T - s2 γ γ^T
# Following legacy code exactly
Sigma = -np.outer(s5, mu)
Sigma = Sigma + Sigma.T + s6 + s1 * np.outer(mu, mu) - s2 * np.outer(gamma, gamma)
# Symmetrize and ensure positive definiteness
Sigma = (Sigma + Sigma.T) / 2
min_eig = np.linalg.eigvalsh(Sigma).min()
if min_eig < 1e-8:
Sigma = Sigma + (1e-8 - min_eig + 1e-8) * np.eye(d)
# Compute Cholesky factor of Sigma once and cache it
from scipy.linalg import cholesky
L_Sigma = cholesky(Sigma, lower=True)
# Bound GIG parameters
a = max(a, 1e-6)
b = max(b, 1e-6)
# Update joint distribution and cache L_Sigma
self._joint.set_classical_params(
mu=mu, gamma=gamma, sigma=Sigma, p=p, a=a, b=b
)
# Cache the Cholesky factor (set_classical_params clears cache, so set after)
self._joint.set_L_Sigma(L_Sigma)
# ========================================================================
# Conditional Expectations (E-step helper)
# ========================================================================
def _conditional_expectation_y_given_x(
self,
X: ArrayLike
) -> Dict[str, NDArray]:
"""
Compute conditional expectations E[Y^α | X] for the E-step (vectorized).
The conditional distribution Y | X = x is GIG with parameters:
.. math::
Y | X = x \\sim \\text{GIG}\\left(p - \\frac{d}{2}, \\,
a + \\gamma^T \\Sigma^{-1} \\gamma, \\,
b + (x-\\mu)^T \\Sigma^{-1}(x-\\mu)\\right)
Parameters
----------
X : array_like
Data points, shape (n, d) or (d,).
Returns
-------
expectations : dict
Dictionary with keys:
- 'E_Y': E[Y | X], shape (n,)
- 'E_inv_Y': E[1/Y | X], shape (n,)
- 'E_log_Y': E[log Y | X], shape (n,)
"""
from normix.utils import kv_ratio
X = np.asarray(X)
if X.ndim == 1:
X = X.reshape(1, -1)
single_point = True
else:
single_point = False
mu = self._joint._mu
gamma = self._joint._gamma
classical = self.joint.classical_params
p = classical['p']
a = classical['a']
b = classical['b']
d = self.d
# Get cached L_Sigma_inv
L_inv = self._joint.L_Sigma_inv
# Transform data: z = L^{-1}(x - μ)
# Mahalanobis distance: q(x) = (x-μ)^T Σ^{-1} (x-μ) = ||z||^2
diff = X - mu # (n, d)
z = L_inv @ diff.T # (d, n)
# Mahalanobis distances: q(x) = (x-μ)^T Σ^{-1} (x-μ) = ||z||^2
q = np.sum(z ** 2, axis=0) # (n,)
# Transform gamma: L^{-1} γ
gamma_z = L_inv @ gamma # (d,)
gamma_quad = np.dot(gamma_z, gamma_z) # γ^T Σ^{-1} γ
# Conditional GIG parameters (vectorized)
p_cond = p - d / 2 # scalar
a_cond = a + gamma_quad # scalar (same for all x)
b_cond = b + q # (n,) - varies with x
# delta = sqrt(b_cond / a_cond), eta = sqrt(a_cond * b_cond)
delta = np.sqrt(b_cond / a_cond) # (n,)
eta = np.sqrt(a_cond * b_cond) # (n,)
# E[Y | X] = delta * K_{p_cond+1}(eta) / K_{p_cond}(eta)
E_Y = delta * kv_ratio(p_cond + 1, p_cond, eta)
# E[1/Y | X] = (1/delta) * K_{p_cond-1}(eta) / K_{p_cond}(eta)
E_inv_Y = kv_ratio(p_cond - 1, p_cond, eta) / delta
# E[log Y | X] = ∂/∂p log(K_p(eta)) + log(delta)
# Numerical derivative for ∂/∂p log(K_p(eta))
eps = 1e-5
d_log_kv_dp = (log_kv(p_cond + eps, eta) - log_kv(p_cond - eps, eta)) / (2 * eps)
E_log_Y = d_log_kv_dp + np.log(delta)
if single_point:
return {
'E_Y': float(E_Y[0]),
'E_inv_Y': float(E_inv_Y[0]),
'E_log_Y': float(E_log_Y[0])
}
return {
'E_Y': E_Y,
'E_inv_Y': E_inv_Y,
'E_log_Y': E_log_Y
}
# ========================================================================
# Special Case Constructors
# ========================================================================
[docs]
@classmethod
def as_variance_gamma(
cls,
mu: ArrayLike,
gamma: ArrayLike,
sigma: ArrayLike,
shape: float,
rate: float
) -> 'GeneralizedHyperbolic':
"""
Create a GH distribution equivalent to Variance Gamma.
VG is GH with b → 0 (Gamma mixing).
Parameters
----------
mu, gamma, sigma : arrays
Normal parameters.
shape : float
Gamma shape parameter α.
rate : float
Gamma rate parameter β.
Returns
-------
gh : GeneralizedHyperbolic
GH distribution equivalent to VG.
"""
mu = np.asarray(mu).flatten()
gamma_arr = np.asarray(gamma).flatten()
sigma = np.asarray(sigma)
# VG: Y ~ Gamma(α, β)
# GH: Y ~ GIG(p, a, b) with b → 0, p = α, a = 2β
# Small b to approximate Gamma limit
b_small = 1e-10
return cls.from_classical_params(
mu=mu, gamma=gamma_arr, sigma=sigma,
p=shape, a=2 * rate, b=b_small
)
[docs]
@classmethod
def as_normal_inverse_gaussian(
cls,
mu: ArrayLike,
gamma: ArrayLike,
sigma: ArrayLike,
delta: float,
eta: float
) -> 'GeneralizedHyperbolic':
"""
Create a GH distribution equivalent to Normal-Inverse Gaussian.
NIG is GH with p = -1/2 (Inverse Gaussian mixing).
Parameters
----------
mu, gamma, sigma : arrays
Normal parameters.
delta : float
IG mean parameter.
eta : float
IG shape parameter.
Returns
-------
gh : GeneralizedHyperbolic
GH distribution equivalent to NIG.
"""
mu = np.asarray(mu).flatten()
gamma_arr = np.asarray(gamma).flatten()
sigma = np.asarray(sigma)
# NIG: Y ~ IG(δ, η)
# GIG: p = -1/2, a = η/δ², b = η
a = eta / (delta ** 2)
b = eta
return cls.from_classical_params(
mu=mu, gamma=gamma_arr, sigma=sigma,
p=-0.5, a=a, b=b
)
[docs]
@classmethod
def as_normal_inverse_gamma(
cls,
mu: ArrayLike,
gamma: ArrayLike,
sigma: ArrayLike,
shape: float,
rate: float
) -> 'GeneralizedHyperbolic':
"""
Create a GH distribution equivalent to Normal-Inverse Gamma.
NInvG is GH with a → 0 (Inverse Gamma mixing).
Parameters
----------
mu, gamma, sigma : arrays
Normal parameters.
shape : float
InverseGamma shape parameter α.
rate : float
InverseGamma rate parameter β.
Returns
-------
gh : GeneralizedHyperbolic
GH distribution equivalent to NInvG.
"""
mu = np.asarray(mu).flatten()
gamma_arr = np.asarray(gamma).flatten()
sigma = np.asarray(sigma)
# NInvG: Y ~ InvGamma(α, β)
# GIG: a → 0, p = -α, b = 2β
a_small = 1e-10
return cls.from_classical_params(
mu=mu, gamma=gamma_arr, sigma=sigma,
p=-shape, a=a_small, b=2 * rate
)
# ========================================================================
# Moments
# ========================================================================
[docs]
def mean(self) -> NDArray:
"""
Mean of the marginal distribution: :math:`E[X] = \\mu + \\gamma E[Y]`.
Returns
-------
mean : ndarray
Mean vector, shape (d,).
"""
mu = self._joint._mu
gamma = self._joint._gamma
classical = self.joint.classical_params
p = classical['p']
a = classical['a']
b = classical['b']
# E[Y] from GIG
sqrt_ab = np.sqrt(a * b)
sqrt_b_over_a = np.sqrt(b / a)
log_kv_p = log_kv(p, sqrt_ab)
log_kv_pp1 = log_kv(p + 1, sqrt_ab)
E_Y = sqrt_b_over_a * np.exp(log_kv_pp1 - log_kv_p)
return mu + gamma * E_Y
[docs]
def var(self) -> NDArray:
"""
Variance of the marginal distribution (diagonal elements of covariance).
.. math::
\\text{Cov}[X] = E[Y] \\Sigma + \\text{Var}[Y] \\gamma\\gamma^T
Returns
-------
var : ndarray
Variance vector, shape (d,).
"""
cov = self.cov()
return np.diag(cov)
[docs]
def cov(self) -> NDArray:
"""
Covariance matrix of the marginal distribution.
.. math::
\\text{Cov}[X] = E[Y] \\Sigma + \\text{Var}[Y] \\gamma\\gamma^T
Returns
-------
cov : ndarray
Covariance matrix, shape (d, d).
"""
gamma = self._joint._gamma
Sigma = self._joint._L_Sigma @ self._joint._L_Sigma.T
classical = self.joint.classical_params
p = classical['p']
a = classical['a']
b = classical['b']
# E[Y] and E[Y²] from GIG
sqrt_ab = np.sqrt(a * b)
sqrt_b_over_a = np.sqrt(b / a)
log_kv_p = log_kv(p, sqrt_ab)
log_kv_pp1 = log_kv(p + 1, sqrt_ab)
log_kv_pp2 = log_kv(p + 2, sqrt_ab)
E_Y = sqrt_b_over_a * np.exp(log_kv_pp1 - log_kv_p)
E_Y2 = (b / a) * np.exp(log_kv_pp2 - log_kv_p)
Var_Y = E_Y2 - E_Y ** 2
return E_Y * Sigma + Var_Y * np.outer(gamma, gamma)
# ========================================================================
# String representation
# ========================================================================
def __repr__(self) -> str:
"""String representation."""
if not self._fitted:
return "GeneralizedHyperbolic(not fitted)"
classical = self.classical_params
d = self.d
p = classical['p']
a = classical['a']
b = classical['b']
if d == 1:
mu = float(classical['mu'][0])
gamma_val = float(classical['gamma'][0])
return f"GeneralizedHyperbolic(μ={mu:.3f}, γ={gamma_val:.3f}, p={p:.3f}, a={a:.3f}, b={b:.3f})"
else:
return f"GeneralizedHyperbolic(d={d}, p={p:.3f}, a={a:.3f}, b={b:.3f})"