Source code for linearmodels.system.covariance

from __future__ import annotations

from typing import cast

from numpy import asarray, empty, eye, ndarray, ones, sqrt, vstack, zeros
from numpy.linalg import inv

from linearmodels.asset_pricing.covariance import _HACMixin
from linearmodels.iv.covariance import cov_cluster
from linearmodels.panel.covariance import cluster_union
from linearmodels.shared.covariance import group_debias_coefficient
from linearmodels.shared.utility import AttrDict
from linearmodels.system._utility import (
    LinearConstraint,
    blocked_cross_prod,
    blocked_diag_product,
    blocked_inner_prod,
)
from linearmodels.typing import Float64Array, IntArray

CLUSTERS_FORMAT = """\
clusters must be an ndarray with as shape (nobs, ncluster) where ncluster is the \
number of clustering variables.  Clustering is only supported in 1 or 2 dimensions.
"""


[docs]class HomoskedasticCovariance: r""" Homoskedastic covariance estimation for system regression Parameters ---------- x : list[ndarray] List of regressor arrays (ndependent) eps : ndarray Model residuals, ndependent by nobs sigma : ndarray Covariance matrix estimator of eps gls : bool Flag indicating to compute the GLS covariance estimator. If False, assume OLS was used debiased : bool Flag indicating to apply a small sample adjustment constraints : {None, LinearConstraint} Constraints used in estimation, if any Notes ----- If GLS is used, the covariance is estimated by .. math:: (X'\Omega^{-1}X)^{-1} where X is a block diagonal matrix of exogenous variables. When GLS is not used, the covariance is estimated by .. math:: (X'X)^{-1}(X'\Omega X)(X'X)^{-1} """ def __init__( self, x: list[ndarray], eps: Float64Array, sigma: Float64Array, full_sigma: Float64Array, *, gls: bool = False, debiased: bool = False, constraints: LinearConstraint | None = None, ) -> None: self._eps = eps self._x = x self._nobs = eps.shape[0] self._k = len(x) self._sigma = sigma self._full_sigma = full_sigma self._gls = gls self._debiased = debiased self._constraints = constraints self._name = "Homoskedastic (Unadjusted) Covariance" self._str_extra = AttrDict(Debiased=self._debiased, GLS=self._gls) self._cov_config = AttrDict(debiased=self._debiased) def __str__(self) -> str: out = self._name extra: list[str] = [] for key in self._str_extra: extra.append(": ".join([str(key), str(self._str_extra[key])])) if extra: out += " (" + ", ".join(extra) + ")" return out def __repr__(self) -> str: out = self.__str__() return out + f", id: {hex(id(self))}" @property def sigma(self) -> Float64Array: """Error covariance""" return self._sigma def _adjustment(self) -> float | ndarray: # Sigma is pre-debiased return 1.0 def _mvreg_cov(self) -> Float64Array: x = self._x xeex = blocked_inner_prod(x, self._sigma) xpx = blocked_inner_prod(self._x, eye(len(x))) if self._constraints is None: xpxi = inv(xpx) cov = xpxi @ xeex @ xpxi else: cons = self._constraints xpx = cons.t.T @ xpx @ cons.t xpxi = inv(xpx) xeex = cons.t.T @ xeex @ cons.t cov = cons.t @ (xpxi @ xeex @ xpxi) @ cons.t.T cov = (cov + cov.T) / 2 return cov def _gls_cov(self) -> Float64Array: x = self._x sigma = self._sigma sigma_inv = inv(sigma) xpx = blocked_inner_prod(x, sigma_inv) # Handles case where sigma_inv is not inverse of full_sigma xeex = blocked_inner_prod(x, sigma_inv @ self._full_sigma @ sigma_inv) if self._constraints is None: xpxi = inv(xpx) cov = xpxi @ xeex @ xpxi else: cons = self._constraints xpx = cons.t.T @ xpx @ cons.t xpxi = inv(xpx) xeex = cons.t.T @ xeex @ cons.t cov = cons.t @ (xpxi @ xeex @ xpxi) @ cons.t.T cov = (cov + cov.T) / 2 return cov @property def cov(self) -> Float64Array: """Parameter covariance""" adj = self._adjustment() if self._gls: return cast(ndarray, adj * self._gls_cov()) else: return cast(ndarray, adj * self._mvreg_cov()) @property def cov_config(self) -> AttrDict: """Optional configuration information used in covariance""" return self._cov_config
[docs]class HeteroskedasticCovariance(HomoskedasticCovariance): r""" Heteroskedastic covariance estimation for system regression Parameters ---------- x : list[ndarray] ndependent element list of regressor eps : ndarray Model residuals, ndependent by nobs sigma : ndarray Covariance matrix estimator of eps gls : bool Flag indicating to compute the GLS covariance estimator. If False, assume OLS was used debiased : bool Flag indicating to apply a small sample adjustment constraints : {None, LinearConstraint} Constraints used in estimation, if any Notes ----- If GLS is used, the covariance is estimated by .. math:: (X'\Omega^{-1}X)^{-1}\tilde{S}(X'\Omega^{-1}X)^{-1} where X is a block diagonal matrix of exogenous variables and where :math:`\tilde{S}` is a estimator of the model scores based on the model residuals and the weighted X matrix :math:`\Omega^{-1/2}X`. When GLS is not used, the covariance is estimated by .. math:: (X'X)^{-1}\hat{S}(X'X)^{-1} where :math:`\hat{S}` is a estimator of the covariance of the model scores. """ def __init__( self, x: list[ndarray], eps: Float64Array, sigma: Float64Array, full_sigma: Float64Array, *, gls: bool = False, debiased: bool = False, constraints: LinearConstraint | None = None, ) -> None: super().__init__( x, eps, sigma, full_sigma, gls=gls, debiased=debiased, constraints=constraints, ) self._name = "Heteroskedastic (Robust) Covariance" k = len(x) nobs = eps.shape[0] if gls: weights = inv(sigma) bigx = blocked_diag_product(x, weights) e = eps.T.ravel()[:, None] bigxe = bigx * e m = bigx.shape[1] xe = zeros((nobs, m)) for i in range(len(x)): xe += bigxe[i * nobs : (i + 1) * nobs] else: # Do not require blocking when not using GLS k_tot = sum(map(lambda a: a.shape[1], x)) xe = empty((nobs, k_tot)) loc = 0 for i in range(k): offset = x[i].shape[1] xe[:, loc : loc + offset] = x[i] * eps[:, i : i + 1] loc += offset self._moments = xe def _xeex(self) -> Float64Array: nobs = self._moments.shape[0] return self._moments.T @ self._moments / nobs def _cov(self, gls: bool) -> Float64Array: x = self._x nobs = x[0].shape[0] k = len(x) sigma = self.sigma weights = inv(sigma) if gls else eye(k) xpx = blocked_inner_prod(x, weights) / nobs xeex = self._xeex() if self._constraints is None: xpxi = inv(xpx) cov = xpxi @ xeex @ xpxi else: assert self._constraints is not None cons = self._constraints xpx = cons.t.T @ xpx @ cons.t xpxi = inv(xpx) xeex = cons.t.T @ xeex @ cons.t cov = cons.t @ (xpxi @ xeex @ xpxi) @ cons.t.T cov = (cov + cov.T) / 2 return cov / nobs def _mvreg_cov(self) -> Float64Array: return self._cov(False) def _gls_cov(self) -> Float64Array: return self._cov(True) def _adjustment(self) -> float | ndarray: if not self._debiased: return 1.0 ks = [s.shape[1] for s in self._x] nobs = self._x[0].shape[0] adj = [] for k in ks: adj.append(nobs / (nobs - k) * ones((k, 1))) adj_arr = vstack(adj) adj_arr = sqrt(adj_arr) # TODO: Check Type return adj_arr @ adj_arr.T
[docs]class KernelCovariance(HeteroskedasticCovariance, _HACMixin): r""" Kernel (HAC) covariance estimation for system regression Parameters ---------- x : list[ndarray] ndependent element list of regressor eps : ndarray Model residuals, ndependent by nobs sigma : ndarray Covariance matrix estimator of eps gls : bool Flag indicating to compute the GLS covariance estimator. If False, assume OLS was used debiased : bool Flag indicating to apply a small sample adjustment kernel : str Name of kernel to use. Supported kernels include: * "bartlett", "newey-west" : Bartlett's kernel * "parzen", "gallant" : Parzen's kernel * "qs", "quadratic-spectral", "andrews" : Quadratic spectral kernel bandwidth : float Bandwidth to use for the kernel. If not provided the optimal bandwidth will be estimated. Notes ----- If GLS is used, the covariance is estimated by .. math:: (X'\Omega^{-1}X)^{-1}\tilde{S}(X'\Omega^{-1}X)^{-1} where X is a block diagonal matrix of exogenous variables and where :math:`\tilde{S}` is a estimator of the covariance of the model scores based on the model residuals and the weighted X matrix :math:`\Omega^{-1/2}X`. When GLS is not used, the covariance is estimated by .. math:: (X'X)^{-1}\hat{S}(X'X)^{-1} where :math:`\hat{S}` is a estimator of the covariance of the model scores. See Also -------- linearmodels.iv.covariance.kernel_weight_bartlett, linearmodels.iv.covariance.kernel_weight_parzen, linearmodels.iv.covariance.kernel_weight_quadratic_spectral """ def __init__( self, x: list[ndarray], eps: Float64Array, sigma: Float64Array, full_sigma: Float64Array, *, gls: bool = False, debiased: bool = False, constraints: LinearConstraint | None = None, kernel: str = "bartlett", bandwidth: float | None = None, ): _HACMixin.__init__(self, kernel, bandwidth) super().__init__( x, eps, sigma, full_sigma, gls=gls, debiased=debiased, constraints=constraints, ) self._check_kernel(kernel) self._check_bandwidth(bandwidth) self._name = "Kernel (HAC) Covariance" self._str_extra["Kernel"] = kernel self._cov_config["kernel"] = kernel def _xeex(self) -> Float64Array: return self._kernel_cov(self._moments) @property def cov_config(self) -> AttrDict: """Optional configuration information used in covariance""" out = AttrDict([(k, v) for k, v in self._cov_config.items()]) out["bandwidth"] = self.bandwidth return out
[docs]class ClusteredCovariance(HeteroskedasticCovariance): r""" Heteroskedastic covariance estimation for system regression Parameters ---------- x : list[ndarray] ndependent element list of regressor eps : ndarray Model residuals, ndependent by nobs sigma : ndarray Covariance matrix estimator of eps gls : bool Flag indicating to compute the GLS covariance estimator. If False, assume OLS was used debiased : bool Flag indicating to apply a small sample adjustment constraints : {None, LinearConstraint} Constraints used in estimation, if any clusters : ndarray Optional array of cluster id. Must be integer valued, and have shape (nobs, ncluster) where ncluster is 1 or 2. group_debias : bool Flag indicating whether to debias by the number of groups. Notes ----- If GLS is used, the covariance is estimated by .. math:: (X'\Omega^{-1}X)^{-1}\tilde{S}_{\mathcal{G}}(X'\Omega^{-1}X)^{-1} where X is a block diagonal matrix of exogenous variables and where :math:`\tilde{S}_{\mathcal{G}}` is a clustered estimator of the model scores based on the model residuals and the weighted X matrix :math:`\Omega^{-1/2}X`. When GLS is not used, the covariance is estimated by .. math:: (X'X)^{-1}\hat{S}_{\mathcal{G}}(X'X)^{-1} where :math:`\hat{S}` is a clustered estimator of the covariance of the model scores. See Also -------- linearmodels.shared.covariance.cov_cluster linearmodels.shared.covariance.group_debias_coefficient """ def __init__( self, x: list[ndarray], eps: Float64Array, sigma: Float64Array, full_sigma: Float64Array, *, gls: bool = False, debiased: bool = False, constraints: LinearConstraint | None = None, clusters: IntArray | None = None, group_debias: bool = False, ) -> None: super().__init__( x, eps, sigma, full_sigma, gls=gls, debiased=debiased, constraints=constraints, ) self._group_debias = group_debias self._nclusters: list[int] = [] self._clusters = self._check_clusters(clusters) self._str_extra["Number of Grouping Variables"] = self._clusters.shape[1] if self._clusters.shape[1] > 0: num_cl = [f"{nc} (Variable {i})" for i, nc in enumerate(self._nclusters)] self._str_extra["Number of Groups"] = " and ".join(num_cl) self._str_extra["Group Debias"] = self._group_debias def _check_clusters(self, clusters: IntArray | None) -> IntArray: """Check cluster dimension and ensure ndarray""" if clusters is None: return empty((self._eps.size, 0), dtype=int) _clusters = asarray(clusters) if _clusters.ndim not in (1, 2): raise ValueError(CLUSTERS_FORMAT, ValueError) elif _clusters.ndim == 1: _clusters = _clusters[:, None] shape = _clusters.shape if shape[0] != self._eps.shape[0] or not 1 <= shape[1] <= 2: raise ValueError(CLUSTERS_FORMAT, ValueError) from pandas import DataFrame df = DataFrame(_clusters) nunique = df.nunique() if _clusters.shape[1] == 2: both = df.groupby([0, 1]).ngroups if both == nunique.max(): raise ValueError( "clusters must be non-nested. You must drop nested " "the nested cluster before computing the clustered" "covariance." ) self._nclusters = list(nunique) return _clusters def _xeex(self) -> Float64Array: if self._clusters.shape[1] == 0: # Heteroskedastic but not clustered return super()._xeex() elif self._clusters.shape[1] == 1: s = cov_cluster(self._moments, self._clusters[:, 0]) if self._group_debias: s *= group_debias_coefficient(self._clusters[:, 0]) return s else: xeex0 = cov_cluster(self._moments, self._clusters[:, 0]) xeex1 = cov_cluster(self._moments, self._clusters[:, 1]) clusters01 = cluster_union(self._clusters) xeex01 = cov_cluster(self._moments, clusters01) if self._group_debias: xeex0 *= group_debias_coefficient(self._clusters[:, 0]) xeex1 *= group_debias_coefficient(self._clusters[:, 1]) xeex01 *= group_debias_coefficient(clusters01) return xeex0 + xeex1 - xeex01 @property def cov_config(self) -> AttrDict: """Optional configuration information used in covariance""" out = AttrDict([(k, v) for k, v in self._cov_config.items()]) out["clusters"] = self._clusters out["group_debias"] = self._group_debias return out
[docs]class GMMHomoskedasticCovariance: r""" Covariance estimator for IV system estimation with homoskedastic data Parameters ---------- x : list[ndarray] List containing the model regressors for each equation in the system z : list[ndarray] List containing the model instruments for each equation in the system eps : ndarray nobs by neq array of residuals where each column corresponds an equation in the system w : ndarray Weighting matrix used in estimation sigma : ndarray Residual covariance used in estimation constraints : {None, LinearConstraint} Constraints used in estimation, if any Notes ----- The covariance is estimated by .. math:: (X'ZW^{-1}Z'X)^{-1}(X'ZW^{-1}\Omega W^{-1}Z'X)(X'ZW^{-1}Z'X)^{-1} where :math:`\Omega = W = Z'(\Sigma \otimes I_N)Z` where m is the number of moments in the system """ def __init__( self, x: list[ndarray], z: list[ndarray], eps: Float64Array, w: Float64Array, *, sigma: ndarray | None = None, debiased: bool = False, constraints: LinearConstraint | None = None, ) -> None: self._x = x self._z = z self._eps = eps self._sigma = sigma self._w = w self._debiased = debiased self._constraints = constraints self._name = "GMM Homoskedastic (Unadjusted) Covariance" self._cov_config = AttrDict(debiased=self._debiased) def __str__(self) -> str: out = self._name return out def __repr__(self) -> str: out = self.__str__() return out + f", id: {hex(id(self))}" @property def cov(self) -> Float64Array: """Parameter covariance""" x, z = self._x, self._z k = len(x) nobs = x[0].shape[0] xpz = blocked_cross_prod(x, z, eye(k)) xpz /= nobs wi = inv(self._w) xpz_wi_zpx = xpz @ wi @ xpz.T omega = self._omega() xpz_wi_omega_wi_zpx = xpz @ wi @ omega @ wi @ xpz.T adj = self._adjustment() if self._constraints is None: xpz_wi_zpxi = inv(xpz_wi_zpx) cov = xpz_wi_zpxi @ xpz_wi_omega_wi_zpx @ xpz_wi_zpxi / nobs else: cons = self._constraints xpz_wi_zpx = cons.t.T @ xpz_wi_zpx @ cons.t xpz_wi_zpxi = inv(xpz_wi_zpx) xpz_wi_omega_wi_zpx = cons.t.T @ xpz_wi_omega_wi_zpx @ cons.t cov = ( cons.t @ xpz_wi_zpxi @ xpz_wi_omega_wi_zpx @ xpz_wi_zpxi @ cons.t.T / nobs ) cov = (cov + cov.T) / 2 return adj * cov def _omega(self) -> Float64Array: z = self._z nobs = z[0].shape[0] sigma = self._sigma assert sigma is not None omega = blocked_inner_prod(z, sigma) omega /= nobs return omega def _adjustment(self) -> float | ndarray: if not self._debiased: return 1.0 k = [s.shape[1] for s in self._x] nobs = self._x[0].shape[0] adj = [] for i in range(len(k)): adj.append(nobs / (nobs - k[i]) * ones((k[i], 1))) adj_arr = vstack(adj) adj_arr = sqrt(adj_arr) return adj_arr @ adj_arr.T @property def cov_config(self) -> AttrDict: """Optional configuration information used in covariance""" return self._cov_config
[docs]class GMMHeteroskedasticCovariance(GMMHomoskedasticCovariance): r""" Covariance estimator for IV system estimation with homoskedastic data Parameters ---------- x : list[ndarray] List containing the model regressors for each equation in the system z : list[ndarray] List containing the model instruments for each equation in the system eps : ndarray nobs by neq array of residuals where each column corresponds an equation in the system w : ndarray Weighting matrix used in estimation sigma : ndarray Residual covariance used in estimation constraints : {None, LinearConstraint} Constraints used in estimation, if any Notes ----- The covariance is estimated by .. math:: (X'ZW^{-1}Z'X)^{-1}(X'ZW^{-1}\Omega W^{-1}Z'X)(X'ZW^{-1}Z'X)^{-1} where :math:`\Omega` is the covariance of the moment conditions. """ def __init__( self, x: list[ndarray], z: list[ndarray], eps: Float64Array, w: Float64Array, *, sigma: ndarray | None = None, debiased: bool = False, constraints: LinearConstraint | None = None, ) -> None: super().__init__( x, z, eps, w, sigma=sigma, debiased=debiased, constraints=constraints ) self._name = "GMM Heteroskedastic (Robust) Covariance" k = len(z) k_total = sum(map(lambda a: a.shape[1], z)) nobs = z[0].shape[0] loc = 0 ze = empty((nobs, k_total)) for i in range(k): kz = z[i].shape[1] ze[:, loc : loc + kz] = z[i] * eps[:, [i]] loc += kz self._moments = ze def _omega(self) -> Float64Array: z = self._z nobs = z[0].shape[0] omega = self._moments.T @ self._moments / nobs return omega
[docs]class GMMKernelCovariance(GMMHeteroskedasticCovariance, _HACMixin): r""" Covariance estimator for IV system estimation with homoskedastic data Parameters ---------- x : list[ndarray] List containing the model regressors for each equation in the system z : list[ndarray] List containing the model instruments for each equation in the system eps : ndarray nobs by neq array of residuals where each column corresponds an equation in the system w : ndarray Weighting matrix used in estimation sigma : ndarray Residual covariance used in estimation constraints : {None, LinearConstraint} Constraints used in estimation, if any kernel : str Name of kernel to use. Supported kernels include: * "bartlett", "newey-west" : Bartlett's kernel * "parzen", "gallant" : Parzen's kernel * "qs", "quadratic-spectral", "andrews" : Quadratic spectral kernel bandwidth : float Bandwidth to use for the kernel. If not provided the optimal bandwidth will be estimated. Notes ----- The covariance is estimated by .. math:: (X'ZW^{-1}Z'X)^{-1}(X'ZW^{-1}\Omega W^{-1}Z'X)(X'ZW^{-1}Z'X)^{-1} where :math:`\Omega` is the covariance of the moment conditions. """ def __init__( self, x: list[ndarray], z: list[ndarray], eps: Float64Array, w: Float64Array, *, sigma: ndarray | None = None, debiased: bool = False, constraints: LinearConstraint | None = None, kernel: str = "bartlett", bandwidth: float | None = None, ) -> None: _HACMixin.__init__(self, kernel, bandwidth) super().__init__( x, z, eps, w, sigma=sigma, debiased=debiased, constraints=constraints ) self._name = "GMM Kernel (HAC) Covariance" self._check_bandwidth(bandwidth) self._check_kernel(kernel) self._cov_config["kernel"] = kernel def _omega(self) -> Float64Array: return self._kernel_cov(self._moments) @property def cov_config(self) -> AttrDict: """Optional configuration information used in covariance""" out = AttrDict([(k, v) for k, v in self._cov_config.items()]) out["bandwidth"] = self.bandwidth return out