Source code for genriesz.generators

"""Bregman generators for Generalized Riesz Regression.

In *genriesz*, generalized Riesz regression (GRR) fits a finite-dimensional model

    v(x) = phi(x)^T beta,

and uses a **link function** to map the linear predictor ``v`` to a
Riesz representer ``alpha``.

A (possibly regressor-dependent) **Bregman generator** is a convex function

    g(x, alpha),

with derivative (wrt ``alpha``) ``∂g(x, alpha)/∂alpha``. GRR uses the *canonical*
(automatic) link

    alpha(x) = (∂g(x, ·))^{-1}( v(x) ),

which is the key mechanism behind **automatic regressor balancing**.

This module provides:

- Built-in generator families:
  - :class:`SquaredGenerator` ("SQ")
  - :class:`UKLGenerator` ("UKL")
  - :class:`BKLGenerator` ("BKL")
  - :class:`BPGenerator` ("BP")
  - :class:`PUGenerator` ("PU")

- A flexible :class:`BregmanGenerator` that lets users specify an arbitrary
  generator ``g`` and (optionally) its derivatives. If derivatives are omitted,
  they are approximated numerically.

Notes
-----
The public interface expected by the generalized Riesz regression solvers is:

- ``alpha = inv_grad(X, v)``
- ``g_val = g(X, alpha)``
- ``g2 = grad2(X, alpha)`` (second derivative wrt ``alpha``; elementwise)
- ``(g_star, alpha) = conjugate(X, v)``

All evaluations are row-wise: ``X`` is (n, d) and outputs are 1D arrays of
length n.
"""

from __future__ import annotations

import inspect
import warnings
from typing import Callable

import numpy as np
from numpy.typing import ArrayLike, NDArray

BranchFn = Callable[[NDArray[np.float64]], int]


def _as_2d(X: ArrayLike, *, name: str = "X") -> NDArray[np.float64]:
    X_ = np.asarray(X, dtype=float)
    if X_.ndim != 2:
        raise ValueError(f"{name} must be 2D. Got shape {X_.shape}.")
    return X_


def _as_1d(v: ArrayLike, *, n: int, name: str = "v") -> NDArray[np.float64]:
    v_ = np.asarray(v, dtype=float).reshape(-1)
    if v_.shape[0] != n:
        raise ValueError(f"{name} must have length {n}. Got shape {v_.shape}.")
    return v_


class _RowwiseScalarFn:
    """Wrap a scalar function and provide vectorized or rowwise evaluation.

    The wrapped callable can have signature:

    - ``f(alpha)``
    - ``f(x, alpha)`` where ``x`` is a 1D regressor row
    - a vectorized form: ``f(alpha_array)`` or ``f(X, alpha_array)``

    The wrapper tries the vectorized call once and caches the outcome.
    """

    def __init__(self, func: Callable):
        self.func = func

        # Determine whether the function expects 1 or 2 positional args.
        try:
            sig = inspect.signature(func)
            n_pos = 0
            for p in sig.parameters.values():
                if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD):
                    n_pos += 1
            self._arity = 1 if n_pos <= 1 else 2
        except Exception:
            # If we cannot inspect, assume 2-arg form.
            self._arity = 2

        # None = unknown, True = vectorized works, False = rowwise only
        self._vectorized: bool | None = None

    def __call__(self, X: NDArray[np.float64], a: NDArray[np.float64]) -> NDArray[np.float64]:
        X = _as_2d(X)
        a = _as_1d(a, n=len(X), name="a")

        if self._vectorized is not False:
            try:
                if self._arity == 1:
                    out = self.func(a)
                else:
                    out = self.func(X, a)
                out_arr = np.asarray(out, dtype=float).reshape(-1)
                if out_arr.shape[0] == len(a):
                    self._vectorized = True
                    return out_arr
            except Exception:
                self._vectorized = False

        out = np.empty(len(a), dtype=float)
        if self._arity == 1:
            for i in range(len(a)):
                out[i] = float(self.func(float(a[i])))
        else:
            for i in range(len(a)):
                out[i] = float(self.func(np.asarray(X[i], dtype=float), float(a[i])))
        return out


[docs] class BregmanGenerator: """A Bregman generator with an (optional) automatic link. Parameters ---------- g: Generator function ``g(x, alpha)`` or ``g(alpha)``. grad: First derivative wrt alpha, ``∂g(x, alpha)/∂alpha``. If omitted, it is approximated by finite differences. inv_grad: Inverse derivative (link) ``alpha = (∂g)^{-1}(x, v)``. If omitted, it is computed by Newton iterations using ``grad`` and ``grad2``. grad2: Second derivative wrt alpha (elementwise). If omitted, it is approximated from ``g`` via a second-order finite difference. name: Display name. C: Optional domain parameter used by some generator families. The generic implementation uses it only as a soft domain guard. branch_fn: Optional branch selector returning 1 (positive) or 0 (negative). Built-in UKL/BP generators use this to choose the sign branch. Notes ----- The generic (user-specified) generator supports regressor-dependent generators via ``g(x, alpha)``. For performance and numerical stability, providing analytic ``grad`` and especially ``inv_grad`` is strongly recommended. """ def __init__( self, *, g: Callable | None = None, grad: Callable | None = None, inv_grad: Callable | None = None, grad2: Callable | None = None, name: str = "Custom", C: float = 0.0, branch_fn: BranchFn | None = None, finite_diff_eps: float = 1e-6, newton_max_iter: int = 60, newton_tol: float = 1e-10, ): self.name = str(name) self.C = float(C) self.branch_fn = branch_fn self._g = None if g is None else _RowwiseScalarFn(g) self._grad = None if grad is None else _RowwiseScalarFn(grad) self._inv_grad = None if inv_grad is None else _RowwiseScalarFn(inv_grad) self._grad2 = None if grad2 is None else _RowwiseScalarFn(grad2) self._eps = float(finite_diff_eps) self._newton_max_iter = int(newton_max_iter) self._newton_tol = float(newton_tol) # ------------------------------------------------------------------ # Compatibility helpers # ------------------------------------------------------------------
[docs] def as_generator(self) -> "BregmanGenerator": """For API compatibility with earlier drafts.""" return self
def evaluate_g(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: return self.g(X, alpha) def evaluate_grad(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: return self.grad(X, alpha) def evaluate_inv_grad(self, X: ArrayLike, v: ArrayLike) -> NDArray[np.float64]: return self.inv_grad(X, v) # ------------------------------------------------------------------ # Internal utilities # ------------------------------------------------------------------ def _sign(self, X: NDArray[np.float64], v: NDArray[np.float64]) -> NDArray[np.float64]: """Return +1/-1 sign array for branch-wise generators.""" if self.branch_fn is None: return np.where(v >= 0.0, 1.0, -1.0) s = np.empty(len(v), dtype=float) for i in range(len(v)): s[i] = 1.0 if int(self.branch_fn(X[i])) == 1 else -1.0 return s def _require_g(self) -> _RowwiseScalarFn: if self._g is None: raise ValueError("This generator does not define g().") return self._g # ------------------------------------------------------------------ # Public interface required by generalized Riesz regression solvers # ------------------------------------------------------------------
[docs] def g(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: """Evaluate g(x, alpha) row-wise.""" X_ = _as_2d(X) a_ = _as_1d(alpha, n=len(X_), name="alpha") gfn = self._require_g() return gfn(X_, a_)
[docs] def grad(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: """Evaluate the derivative ∂g/∂alpha row-wise.""" X_ = _as_2d(X) a_ = _as_1d(alpha, n=len(X_), name="alpha") if self._grad is not None: return self._grad(X_, a_) # Finite differences on g eps = self._eps gfn = self._require_g() return (gfn(X_, a_ + eps) - gfn(X_, a_ - eps)) / (2.0 * eps)
[docs] def grad2(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: """Evaluate the second derivative ∂²g/∂alpha² row-wise.""" X_ = _as_2d(X) a_ = _as_1d(alpha, n=len(X_), name="alpha") if self._grad2 is not None: return self._grad2(X_, a_) # Second-order finite difference on g eps = self._eps gfn = self._require_g() g_p = gfn(X_, a_ + eps) g_0 = gfn(X_, a_) g_m = gfn(X_, a_ - eps) return (g_p - 2.0 * g_0 + g_m) / (eps * eps)
[docs] def inv_grad(self, X: ArrayLike, v: ArrayLike) -> NDArray[np.float64]: """Inverse derivative map alpha = (∂g)^{-1}(x, v).""" X_ = _as_2d(X) v_ = _as_1d(v, n=len(X_), name="v") if self._inv_grad is not None: return self._inv_grad(X_, v_) # Automatic inversion via Newton iterations. # This is a generic fallback and may be slow or unstable. s = self._sign(X_, v_) # Heuristic initialization. alpha = v_.copy() if self.branch_fn is not None: alpha = s * np.abs(alpha) # Soft domain guard used by UKL/BP-like generators. if self.C > 0: alpha = s * np.maximum(np.abs(alpha), self.C + 1e-6) tol = self._newton_tol for _ in range(self._newton_max_iter): g1 = self.grad(X_, alpha) diff = g1 - v_ max_abs = float(np.max(np.abs(diff))) if not np.isfinite(max_abs): break if max_abs < tol: return alpha g2 = self.grad2(X_, alpha) g2 = np.asarray(g2, dtype=float) # Guard against non-positive curvature (should not happen for strictly convex g). g2 = np.where(np.isfinite(g2) & (g2 > 1e-12), g2, 1e-12) step = diff / g2 step = np.clip(step, -50.0, 50.0) alpha = alpha - step if self.branch_fn is not None: alpha = s * np.abs(alpha) if self.C > 0: alpha = s * np.maximum(np.abs(alpha), self.C + 1e-6) # If we reach here, Newton did not converge reliably. raise RuntimeError( "Failed to numerically invert grad. Provide inv_grad (and preferably grad/grad2) " "for this generator." )
[docs] def conjugate(self, X: ArrayLike, v: ArrayLike) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """Return (g*(v), alpha) evaluated row-wise.""" X_ = _as_2d(X) v_ = _as_1d(v, n=len(X_), name="v") alpha = self.inv_grad(X_, v_) g_val = self.g(X_, alpha) g_star = v_ * alpha - g_val return g_star, alpha
[docs] class SquaredGenerator(BregmanGenerator): """Squared generator (SQ-Riesz). g(alpha) = (alpha - C)^2. This generator has no strict domain constraints and induces a linear link alpha = C + 0.5 * v. """ def __init__(self, C: float = 0.0): super().__init__(name="SQ", C=float(C), branch_fn=None)
[docs] def inv_grad(self, X: ArrayLike, v: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) v_ = _as_1d(v, n=len(X_), name="v") return self.C + 0.5 * v_
[docs] def g(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) a = _as_1d(alpha, n=len(X_), name="alpha") return np.square(a - self.C)
[docs] def grad(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) a = _as_1d(alpha, n=len(X_), name="alpha") return 2.0 * (a - self.C)
[docs] def grad2(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) a = _as_1d(alpha, n=len(X_), name="alpha") return np.full_like(a, 2.0, dtype=float)
[docs] class UKLGenerator(BregmanGenerator): """Unnormalized KL generator (UKL-Riesz). g(alpha) = (|alpha| - C) log(|alpha| - C) - |alpha|, with |alpha| > C. The inverse gradient is branch-wise: - positive branch (sign +1): alpha = C + exp(v) - negative branch (sign -1): alpha = -C - exp(-v) If ``branch_fn`` is provided, it determines which branch is used for each observation. """ def __init__(self, C: float = 1.0, *, branch_fn: BranchFn | None = None): if float(C) < 0: raise ValueError("C must be >= 0") if branch_fn is None: warnings.warn( "UKLGenerator without branch_fn uses sign(v) to select the alpha branch. " "This is correct only when |alpha| > C + 1. " "For GRR with functionals that require negative alpha (e.g. ATE/ATT), " "provide branch_fn or use SquaredGenerator instead.", UserWarning, stacklevel=2, ) super().__init__(name="UKL", C=float(C), branch_fn=branch_fn)
[docs] def inv_grad(self, X: ArrayLike, v: ArrayLike) -> NDArray[np.float64]: """Branch-wise inverse gradient alpha = (g')^{-1}(v).""" X_ = _as_2d(X) v_ = _as_1d(v, n=len(X_), name="v") s = self._sign(X_, v_) # exp can underflow to 0 for large negative inputs; clip and floor. z = np.clip(s * v_, -700.0, 700.0) exp_term = np.exp(z) exp_term = np.maximum(exp_term, 1e-12) return s * (self.C + exp_term)
[docs] def g(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) a = _as_1d(alpha, n=len(X_), name="alpha") t = np.abs(a) - self.C t = np.maximum(t, 1e-12) return t * np.log(t) - np.abs(a)
[docs] def grad(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) a = _as_1d(alpha, n=len(X_), name="alpha") t = np.abs(a) - self.C t = np.maximum(t, 1e-12) return np.sign(a) * np.log(t)
[docs] def grad2(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) a = _as_1d(alpha, n=len(X_), name="alpha") t = np.abs(a) - self.C t = np.maximum(t, 1e-12) return 1.0 / t
[docs] class BPGenerator(BregmanGenerator): """Box-Power generator (BP-Riesz). A smooth family interpolating between UKL-like (small power) and squared-like (power near 1) behavior. We use the parametrization, for ``t = |alpha| - C``, g(alpha) = ( t^{1+omega} - (1+omega) t ) / omega, with domain ``|alpha| > C`` and ``omega > 0``. The derivative is g'(alpha) = sign(alpha) * (1+omega)/omega * ( t^omega - 1 ), so the inverse gradient (branch-wise) can be written as k = (1+omega)/omega u = 1 + sign * v / k (must be > 0) alpha = sign * ( C + u^{1/omega} ). As with UKL, ``branch_fn`` can be supplied to select the sign. """ def __init__( self, C: float = 1.0, *, omega: float = 0.5, power: float | None = None, branch_fn: BranchFn | None = None, ): if power is not None: omega = float(power) if float(C) < 0: raise ValueError("C must be >= 0") if float(omega) <= 0: raise ValueError("omega must be > 0") self.omega = float(omega) if branch_fn is None: warnings.warn( "BPGenerator without branch_fn uses sign(v) to select the alpha branch. " "This is correct only when |alpha| - C > 1. " "For GRR with functionals that require negative alpha (e.g. ATE/ATT), " "provide branch_fn or use SquaredGenerator instead.", UserWarning, stacklevel=2, ) super().__init__(name=f"BP(omega={self.omega:g})", C=float(C), branch_fn=branch_fn)
[docs] def inv_grad(self, X: ArrayLike, v: ArrayLike) -> NDArray[np.float64]: """Branch-wise inverse gradient map for BP. The theoretical domain restriction is ``t = 1 + sign*v/k > 0``. In finite samples (and especially under cross fitting) the linear predictor can violate this constraint. Instead of raising an exception, we **clip** ``t`` to a small positive value. """ X_ = _as_2d(X) v_ = _as_1d(v, n=len(X_), name="v") s = self._sign(X_, v_) k = 1.0 + 1.0 / self.omega t = 1.0 + s * v_ / k t = np.maximum(t, 1e-6) return s * (self.C + np.power(t, 1.0 / self.omega))
[docs] def g(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) a = _as_1d(alpha, n=len(X_), name="alpha") t = np.abs(a) - self.C t = np.maximum(t, 1e-12) return (np.power(t, 1.0 + self.omega) - (1.0 + self.omega) * t) / self.omega
[docs] def grad(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) a = _as_1d(alpha, n=len(X_), name="alpha") t = np.abs(a) - self.C t = np.maximum(t, 1e-12) k = 1.0 + 1.0 / self.omega return np.sign(a) * k * (np.power(t, self.omega) - 1.0)
[docs] def grad2(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) a = _as_1d(alpha, n=len(X_), name="alpha") t = np.abs(a) - self.C t = np.maximum(t, 1e-12) k = 1.0 + 1.0 / self.omega return k * self.omega * np.power(t, self.omega - 1.0)
[docs] class BKLGenerator(BregmanGenerator): """Binary KL generator (BKL-Riesz). The generator is g(alpha) = (|alpha| - C) log(|alpha| - C) - (|alpha| + C) log(|alpha| + C), with domain ``|alpha| > C`` and ``C > 0``. Its derivative is g'(alpha) = sign(alpha) * log( (|alpha|-C) / (|alpha|+C) ). The inverse gradient is branch-wise. Let ``s`` be the desired sign branch (+1 or -1) and let ``u = s * v``. Since the log-ratio is always negative, the theoretical domain is ``u < 0``. In finite samples (and especially under cross fitting), ``u`` may violate this. Instead of raising an exception, we **clip** ``u`` to a small negative value to keep the link well-defined. If ``branch_fn`` is provided, it selects the sign branch. """ def __init__(self, C: float = 1.0, *, branch_fn: BranchFn | None = None): if float(C) <= 0: raise ValueError("C must be > 0 for BKLGenerator") super().__init__(name="BKL", C=float(C), branch_fn=branch_fn)
[docs] def inv_grad(self, X: ArrayLike, v: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) v_ = _as_1d(v, n=len(X_), name="v") # For BKL, the positive branch corresponds to v <= 0. if self.branch_fn is None: s = np.where(v_ <= 0.0, 1.0, -1.0) else: s = self._sign(X_, v_) # Enforce u = s*v < 0. u = s * v_ u = np.clip(u, -700.0, -1e-8) t = np.exp(u) # in (0, 1) denom = 1.0 - t denom = np.maximum(denom, 1e-12) a = self.C * (1.0 + t) / denom return s * a
[docs] def g(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) a = _as_1d(alpha, n=len(X_), name="alpha") t1 = np.abs(a) - self.C t2 = np.abs(a) + self.C t1 = np.maximum(t1, 1e-12) t2 = np.maximum(t2, 1e-12) return t1 * np.log(t1) - t2 * np.log(t2)
[docs] def grad(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) a = _as_1d(alpha, n=len(X_), name="alpha") t1 = np.abs(a) - self.C t2 = np.abs(a) + self.C t1 = np.maximum(t1, 1e-12) t2 = np.maximum(t2, 1e-12) return np.sign(a) * (np.log(t1) - np.log(t2))
[docs] def grad2(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) a = _as_1d(alpha, n=len(X_), name="alpha") abs_a = np.abs(a) denom = abs_a * abs_a - self.C * self.C denom = np.maximum(denom, 1e-12) return (2.0 * self.C) / denom
[docs] class PUGenerator(BregmanGenerator): """PU generator (PU-Riesz). This generator is based on the binary-entropy potential g(alpha) = C * [ |alpha| log|alpha| + (1-|alpha|) log(1-|alpha|) ], with domain ``|alpha| in (0, 1)`` and ``C > 0``. The derivative is g'(alpha) = sign(alpha) * C * log( |alpha| / (1-|alpha|) ). The inverse gradient is a (scaled) logistic map. Notes ----- This generator is primarily useful when you want the representer to be bounded (in absolute value) by 1. """ def __init__(self, C: float = 1.0, *, branch_fn: BranchFn | None = None): if float(C) <= 0: raise ValueError("C must be > 0 for PUGenerator") super().__init__(name="PU", C=float(C), branch_fn=branch_fn)
[docs] def inv_grad(self, X: ArrayLike, v: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) v_ = _as_1d(v, n=len(X_), name="v") s = self._sign(X_, v_) z = np.clip(s * v_ / self.C, -700.0, 700.0) a = 1.0 / (1.0 + np.exp(-z)) a = np.clip(a, 1e-10, 1.0 - 1e-10) return s * a
[docs] def g(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) a = _as_1d(alpha, n=len(X_), name="alpha") t = np.clip(np.abs(a), 1e-10, 1.0 - 1e-10) return self.C * (t * np.log(t) + (1.0 - t) * np.log(1.0 - t))
[docs] def grad(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) a = _as_1d(alpha, n=len(X_), name="alpha") t = np.clip(np.abs(a), 1e-10, 1.0 - 1e-10) return np.sign(a) * self.C * (np.log(t) - np.log(1.0 - t))
[docs] def grad2(self, X: ArrayLike, alpha: ArrayLike) -> NDArray[np.float64]: X_ = _as_2d(X) a = _as_1d(alpha, n=len(X_), name="alpha") t = np.clip(np.abs(a), 1e-10, 1.0 - 1e-10) return self.C / (t * (1.0 - t))