"""GLM-style solvers used by Generalized Riesz Regression.
The library uses a simple finite-dimensional model:
v(x) = phi(x)^T beta,
and a generator-specific link that maps the linear predictor ``v`` to a Riesz
representer ``alpha(x)``.
The GRR objective for beta can be written as
min_beta E[ g*(v(X)) - m(X, v) ] + penalty(beta),
where ``g*`` is the convex conjugate of the generator and ``m(X,v)`` is linear
in ``v``.
We solve the resulting convex (often smooth) problem with L-BFGS-B.
"""
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
from numpy.typing import ArrayLike, NDArray
from scipy import optimize
from .basis import Basis
from .functionals import LinearFunctional
from .generators import BregmanGenerator
def _as_2d(X: ArrayLike) -> NDArray[np.float64]:
X_ = np.asarray(X, dtype=float)
if X_.ndim != 2:
raise ValueError(f"X must be 2D. Got shape {X_.shape}.")
return X_
def _as_1d(y: ArrayLike, *, n: int, name: str) -> NDArray[np.float64]:
y_ = np.asarray(y, dtype=float).reshape(-1)
if y_.shape[0] != n:
raise ValueError(f"{name} must have length {n}. Got {y_.shape}.")
return y_
def _sigmoid(z: NDArray[np.float64]) -> NDArray[np.float64]:
# Numerically stable sigmoid
out = np.empty_like(z)
pos = z >= 0
out[pos] = 1.0 / (1.0 + np.exp(-z[pos]))
expz = np.exp(z[~pos])
out[~pos] = expz / (1.0 + expz)
return out
@dataclass
class FitResult:
beta: NDArray[np.float64]
success: bool
message: str
n_iter: int
class _Penalty:
def __init__(self, penalty: str | None, lam: float, p_norm: float | None):
self.penalty = None if penalty is None else str(penalty).lower()
self.lam = float(lam)
self.p_norm = 2.0 if p_norm is None else float(p_norm)
if self.lam < 0:
raise ValueError("lam must be >= 0")
# Convenience shorthand: allow strings like "l1.5" to mean an l_p penalty.
# This keeps the public API concise while still supporting general l_p.
if self.penalty is not None and self.penalty.startswith("l") and self.penalty not in {"l1", "l2", "lp", "l_p"}:
try:
p = float(self.penalty[1:])
self.penalty = "lp"
self.p_norm = p
except Exception:
# Fall through to the standard parser below.
pass
if self.penalty in {"l1", "lasso"}:
self.p_norm = 1.0
elif self.penalty in {"l2", "ridge"}:
self.p_norm = 2.0
elif self.penalty in {"lp", "l_p", "p"}:
if self.p_norm < 1.0:
raise ValueError("p_norm must be >= 1")
elif self.penalty in {None, "none", ""}:
self.penalty = None
else:
raise ValueError(f"Unknown penalty: {penalty}")
# Smoothing for the l1 gradient (subgradient at 0)
self._eps = 1e-8
def value(self, beta: NDArray[np.float64]) -> float:
if self.penalty is None or self.lam == 0.0:
return 0.0
if self.p_norm == 1.0:
return float(self.lam * np.sum(np.abs(beta)))
return float((self.lam / self.p_norm) * np.sum(np.abs(beta) ** self.p_norm))
def grad(self, beta: NDArray[np.float64]) -> NDArray[np.float64]:
if self.penalty is None or self.lam == 0.0:
return np.zeros_like(beta)
if self.p_norm == 1.0:
return self.lam * beta / np.sqrt(beta * beta + self._eps)
return self.lam * np.sign(beta) * (np.abs(beta) ** (self.p_norm - 1.0))
[docs]
class GRRGLM:
"""Finite-dimensional generalized Riesz regression (GLM form)."""
def __init__(
self,
*,
basis: Basis,
generator: BregmanGenerator,
functional: LinearFunctional,
penalty: str | None = "l2",
lam: float = 1e-3,
p_norm: float | None = None,
):
self.basis = basis
self.generator = generator
self.functional = functional
self.penalty = _Penalty(penalty, lam, p_norm)
self._Phi: NDArray[np.float64] | None = None
self._M: NDArray[np.float64] | None = None
self.beta_: NDArray[np.float64] | None = None
self.fit_result_: FitResult | None = None
def fit(
self,
X: ArrayLike,
*,
beta0: ArrayLike | None = None,
max_iter: int = 500,
tol: float = 1e-8,
verbose: bool = False,
) -> FitResult:
X_ = _as_2d(X)
self._Phi = np.asarray(self.basis(X_), dtype=float)
self._M = np.asarray(self.functional.m_basis_matrix(X_, self.basis), dtype=float)
n, p = self._Phi.shape
if self._M.shape != (n, p):
raise ValueError(
f"m_basis_matrix returned shape {self._M.shape}, expected {(n, p)}."
)
if beta0 is None:
beta0_ = np.zeros(p, dtype=float)
else:
beta0_ = np.asarray(beta0, dtype=float).reshape(-1)
if beta0_.shape[0] != p:
raise ValueError(f"beta0 must have length {p}. Got {beta0_.shape}.")
Phi = self._Phi
M = self._M
# Objective and gradient
big = 1e20
def fun(beta: NDArray[np.float64]) -> float:
v = Phi @ beta
try:
g_star, _ = self.generator.conjugate(X_, v)
except Exception:
return big
loss = float(np.mean(g_star - (M @ beta)))
return loss + self.penalty.value(beta)
def jac(beta: NDArray[np.float64]) -> NDArray[np.float64]:
v = Phi @ beta
try:
_, alpha = self.generator.conjugate(X_, v)
except Exception:
return np.zeros_like(beta)
grad = (alpha[:, None] * Phi - M).mean(axis=0)
return grad + self.penalty.grad(beta)
opts: dict = {"maxiter": int(max_iter), "ftol": float(tol)}
if verbose:
opts["iprint"] = 1
res = optimize.minimize(fun=fun, x0=beta0_, jac=jac, method="L-BFGS-B", options=opts)
beta_hat = np.asarray(res.x, dtype=float)
out = FitResult(
beta=beta_hat,
success=bool(res.success),
message=str(res.message),
n_iter=int(getattr(res, "nit", -1)),
)
self.beta_ = beta_hat
self.fit_result_ = out
return out
def predict_v(self, X: ArrayLike) -> NDArray[np.float64]:
if self.beta_ is None:
raise RuntimeError("Model is not fit.")
Phi = np.asarray(self.basis(_as_2d(X)), dtype=float)
return Phi @ self.beta_
def predict_alpha(self, X: ArrayLike) -> NDArray[np.float64]:
v = self.predict_v(X)
return self.generator.inv_grad(_as_2d(X), v)
[docs]
def derivative_alpha(self, X: ArrayLike, coordinate: int) -> NDArray[np.float64]:
"""Derivative of alpha(x) wrt x_coordinate.
Uses the identity grad_g(alpha(x)) = v(x) and the inverse function theorem:
g''(alpha) * d alpha/dx = d v/dx.
"""
if self.beta_ is None:
raise RuntimeError("Model is not fit.")
X_ = _as_2d(X)
dPhi = self.basis.derivative(X_, coordinate)
dv = dPhi @ self.beta_
alpha = self.predict_alpha(X_)
g2 = self.generator.grad2(X_, alpha)
return dv / g2
class OutcomeGLM:
"""Simple (penalized) outcome regression on top of a basis."""
def __init__(
self,
*,
basis: Basis,
link: str = "identity",
penalty: str | None = "l2",
lam: float = 1e-3,
p_norm: float | None = None,
):
self.basis = basis
self.link = str(link).lower()
if self.link not in {"identity", "logit"}:
raise ValueError("link must be 'identity' or 'logit'")
self.penalty = _Penalty(penalty, lam, p_norm)
self.theta_: NDArray[np.float64] | None = None
def fit(
self,
X: ArrayLike,
y: ArrayLike,
*,
theta0: ArrayLike | None = None,
max_iter: int = 500,
tol: float = 1e-8,
verbose: bool = False,
) -> FitResult:
X_ = _as_2d(X)
Phi = np.asarray(self.basis(X_), dtype=float)
n, p = Phi.shape
y_ = _as_1d(y, n=n, name="y")
if theta0 is None:
theta0_ = np.zeros(p, dtype=float)
else:
theta0_ = np.asarray(theta0, dtype=float).reshape(-1)
if theta0_.shape[0] != p:
raise ValueError(f"theta0 must have length {p}. Got {theta0_.shape}.")
# Closed form for identity + l2 (ridge)
if self.link == "identity" and self.penalty.penalty in {"l2", "ridge"}:
if self.penalty.lam == 0.0:
# Ordinary least squares (with pseudo-inverse)
theta = np.linalg.pinv(Phi) @ y_
elif p > n:
# Dual (kernel ridge / Woodbury) form: O(n^3) instead of O(p^3)
K = (Phi @ Phi.T) / n
theta = (Phi.T @ np.linalg.solve(K + self.penalty.lam * np.eye(n), y_)) / n
else:
A = (Phi.T @ Phi) / n + self.penalty.lam * np.eye(p)
b = (Phi.T @ y_) / n
theta = np.linalg.solve(A, b)
self.theta_ = np.asarray(theta, dtype=float)
return FitResult(beta=self.theta_, success=True, message="closed_form", n_iter=1)
def fun(theta: NDArray[np.float64]) -> float:
eta = Phi @ theta
if self.link == "identity":
resid = y_ - eta
loss = 0.5 * float(np.mean(resid * resid))
else:
# Bernoulli negative log-likelihood
# mean(log(1+exp(eta)) - y*eta)
loss = float(np.mean(np.logaddexp(0.0, eta) - y_ * eta))
return loss + self.penalty.value(theta)
def jac(theta: NDArray[np.float64]) -> NDArray[np.float64]:
eta = Phi @ theta
if self.link == "identity":
resid = y_ - eta
grad = -(Phi.T @ resid) / n
else:
p_hat = _sigmoid(eta)
grad = (Phi.T @ (p_hat - y_)) / n
return grad + self.penalty.grad(theta)
opts_: dict = {"maxiter": int(max_iter), "ftol": float(tol)}
if verbose:
opts_["iprint"] = 1
res = optimize.minimize(fun=fun, x0=theta0_, jac=jac, method="L-BFGS-B", options=opts_)
theta_hat = np.asarray(res.x, dtype=float)
self.theta_ = theta_hat
return FitResult(
beta=theta_hat,
success=bool(res.success),
message=str(res.message),
n_iter=int(getattr(res, "nit", -1)),
)
def predict_link(self, X: ArrayLike) -> NDArray[np.float64]:
if self.theta_ is None:
raise RuntimeError("OutcomeGLM is not fit.")
Phi = np.asarray(self.basis(_as_2d(X)), dtype=float)
return Phi @ self.theta_
def predict(self, X: ArrayLike) -> NDArray[np.float64]:
eta = self.predict_link(X)
if self.link == "identity":
return eta
return _sigmoid(eta)
def derivative(self, X: ArrayLike, coordinate: int) -> NDArray[np.float64]:
if self.theta_ is None:
raise RuntimeError("OutcomeGLM is not fit.")
X_ = _as_2d(X)
dPhi = self.basis.derivative(X_, coordinate)
deta = dPhi @ self.theta_
if self.link == "identity":
return deta
mu = self.predict(X_)
return mu * (1.0 - mu) * deta