"""
Covariance estimation for 2SLS and LIML IV estimators
"""
from __future__ import annotations
from typing import Any, Callable, Union, cast
from mypy_extensions import VarArg
from numpy import (
arange,
asarray,
ceil,
cos,
empty,
int64,
ndarray,
ones,
pi,
sin,
sum as npsum,
unique,
zeros,
)
from numpy.linalg import inv, pinv
from linearmodels.shared.covariance import cov_cluster, cov_kernel
from linearmodels.typing import AnyArray, Float64Array, Numeric, OptionalNumeric
KernelWeight = Union[
Callable[[float, float], ndarray],
Callable[[float, int], ndarray],
Callable[[float, VarArg(Any)], ndarray],
]
CLUSTER_ERR = """
clusters has the wrong nobs. Expected {0}, got {1}. Any missing observation
in the regression variables have have been dropped. When using a clustered
covariance estimator, drop missing data before estimating the model. The model
property `notnull` contains the locations of the observations that have no
missing values."""
[docs]
def kernel_weight_bartlett(bw: float, *args: int) -> Float64Array:
r"""
Kernel weights from a Bartlett kernel
Parameters
----------
bw: int
Maximum lag to used in kernel
Returns
-------
ndarray
Weight array ordered by lag position (maxlag + 1)
Notes
-----
.. math::
w_i = 1 - i / (m + 1), \, i < m
"""
return 1 - arange(int(bw) + 1) / (int(bw) + 1)
[docs]
def kernel_weight_quadratic_spectral(bw: float, n: int) -> Float64Array:
r"""
Kernel weights from a quadratic-spectral kernel
Parameters
----------
bw: {int, float}
Maximum lag to used in kernel
n : int
Positive number of weight to return
Returns
-------
ndarray
Weight array ordered by lag position (maxlag + 1)
Notes
-----
Unlike the Bartlett or Parzen kernels, the QS kernel is not truncated at
a specific lag, and so weights are computed for all available lags in
the sample.
.. math::
z_i & = 6 \pi (i / m) / 5 \\
w_0 & = 1 \\
w_i & = 3(\sin(z_i)/z_i - cos(z_i))/z_i^ 2, \, i \geq 1
where m is the bandwidth.
"""
if bw == 0: # Special handling of bw == 0 case
w = zeros(n + 1)
w[0] = 0
return w
z = arange(n + 1) / float(bw)
w = 6 * pi * z / 5
w[0] = 1
w[1:] = 3 / w[1:] ** 2 * (sin(w[1:]) / w[1:] - cos(w[1:]))
return w
[docs]
def kernel_weight_parzen(bw: float, *args: int) -> Float64Array:
r"""
Kernel weights from a Parzen kernel
Parameters
----------
bw : int
Maximum lag to used in kernel
Returns
-------
ndarray
Weight array ordered by lag position (maxlag + 1)
Notes
-----
.. math::
z_i & = i / (m+1) \\
w_i & = 1-6z_i^2+6z_i^3, z \leq 0.5 \\
w_i & = 2(1-z_i)^3, z > 0.5
"""
z = arange(int(bw) + 1) / (int(bw) + 1)
w = 1 - 6 * z**2 + 6 * z**3
w[z > 0.5] = 2 * (1 - z[z > 0.5]) ** 3
return w
[docs]
def kernel_optimal_bandwidth(x: Float64Array, kernel: str = "bartlett") -> int:
"""
Parameters
----------
x : ndarray
Array of data to use when computing optimal bandwidth
kernel : str
Name of kernel to use. Supported kernels include:
* "bartlett", "newey-west" - Bartlett's kernel
* "parzen", "gallant" - Parzen's kernel
* "qs", "quadratic-spectral", "andrews" - Quadratic spectral kernel
Returns
-------
int
Optimal bandwidth. Set to nobs - 1 if computed bandwidth is larger.
Notes
-----
.. todo::
* Explain mathematics involved
* References
See Also
--------
linearmodels.iv.covariance.kernel_weight_bartlett,
linearmodels.iv.covariance.kernel_weight_parzen,
linearmodels.iv.covariance.kernel_weight_quadratic_spectral
"""
t = x.shape[0]
x = x.squeeze()
if kernel in ("bartlett", "newey-west"):
q, c = 1, 1.1447
m_star = int(ceil(4 * (t / 100) ** (2 / 9)))
elif kernel in ("qs", "andrews", "quadratic-spectral"):
q, c = 2, 1.3221
m_star = int(ceil(4 * (t / 100) ** (2 / 25)))
elif kernel in ("gallant", "parzen"):
q, c = 2, 2.6614
m_star = int(ceil(4 * (t / 100) ** (4 / 25)))
else:
raise ValueError(f"Unknown kernel: {kernel}")
sigma = empty(m_star + 1)
sigma[0] = x.T @ x / t
for i in range(1, m_star + 1):
sigma[i] = x[i:].T @ x[:-i] / t
s0 = sigma[0] + 2 * sigma[1:].sum()
sq = 2 * npsum(sigma[1:] * arange(1, m_star + 1) ** q)
rate = 1 / (2 * q + 1)
gamma = c * ((sq / s0) ** 2) ** rate
m = gamma * t**rate
return min(int(ceil(m)), t - 1)
KERNEL_LOOKUP: dict[str, KernelWeight] = {
"bartlett": kernel_weight_bartlett,
"newey-west": kernel_weight_bartlett,
"quadratic-spectral": kernel_weight_quadratic_spectral,
"qs": kernel_weight_quadratic_spectral,
"andrews": kernel_weight_quadratic_spectral,
"gallant": kernel_weight_parzen,
"parzen": kernel_weight_parzen,
}
[docs]
class HomoskedasticCovariance:
r"""
Covariance estimation for homoskedastic data
Parameters
----------
x : ndarray
Model regressors (nobs by nvar)
y : ndarray
Series modeled (nobs by 1)
z : ndarray
Instruments used for endogenous regressors (nobs by ninstr)
params : ndarray
Estimated model parameters (nvar by 1)
debiased : bool
Flag indicating whether to use a small-sample adjustment
kappa : float
Value of kappa in k-class estimator
Notes
-----
Covariance is estimated using
.. math::
n^{-1} s^2 V^{-1}
where
.. math::
s^2 = n^{-1} \sum_{i=1}^n \hat{\epsilon}_i^2
If ``debiased`` is true, then :math:`s^2` is scaled by n / (n-k).
.. math::
V = n^{-1} X'Z(Z'Z)^{-1}Z'X
where :math:`X` is the matrix of variables included in the model and
:math:`Z` is the matrix of instruments, including exogenous regressors.
"""
def __init__(
self,
x: Float64Array,
y: Float64Array,
z: Float64Array,
params: Float64Array,
debiased: bool = False,
kappa: Numeric = 1,
):
if not (x.shape[0] == y.shape[0] == z.shape[0]):
raise ValueError("x, y and z must have the same number of rows")
if not x.shape[1] == len(params):
raise ValueError("x and params must have compatible dimensions")
self.x = x
self.y = y
self.z = z
self.params = params
self._debiased = debiased
self.eps = y - x @ params
self._kappa = kappa
self._pinvz = pinv(z)
nobs, nvar = x.shape
self._scale: float = nobs / (nobs - nvar) if self._debiased else 1.0
self._name = "Unadjusted Covariance (Homoskedastic)"
def __str__(self) -> str:
out = self._name
out += f"\nDebiased: {self._debiased}"
if self._kappa != 1:
out += f"\nKappa: {self._kappa:0.3f}"
return out
def __repr__(self) -> str:
return (
self.__str__() + "\n" + self.__class__.__name__ + f", id: {hex(id(self))}"
)
@property
def s(self) -> Float64Array:
"""Score covariance estimate"""
x, z, eps = self.x, self.z, self.eps
nobs = x.shape[0]
s2 = eps.T @ eps / nobs
pinvz = self._pinvz
v = (x.T @ z) @ (pinvz @ x) / nobs
if self._kappa != 1:
kappa = self._kappa
xpx = x.T @ x / nobs
v = (1 - kappa) * xpx + kappa * v
return self._scale * s2 * v
@property
def cov(self) -> Float64Array:
"""Covariance of estimated parameters"""
x, z = self.x, self.z
nobs = x.shape[0]
pinvz = self._pinvz
v = (x.T @ z) @ (pinvz @ x) / nobs
if self._kappa != 1:
kappa = self._kappa
xpx = x.T @ x / nobs
v = (1 - kappa) * xpx + kappa * v
vinv = inv(v)
c = vinv @ self.s @ vinv / nobs
return (c + c.T) / 2
@property
def s2(self) -> Float64Array:
"""
Estimated variance of residuals. Small-sample adjusted if debiased.
"""
nobs = self.x.shape[0]
eps = self.eps
return self._scale * eps.T @ eps / nobs
@property
def debiased(self) -> bool:
"""Flag indicating if covariance is debiased"""
return self._debiased
@property
def config(self) -> dict[str, Any]:
return {"debiased": self.debiased, "kappa": self._kappa}
[docs]
class HeteroskedasticCovariance(HomoskedasticCovariance):
r"""
Covariance estimation for heteroskedastic data
Parameters
----------
x : ndarray
Model regressors (nobs by nvar)
y : ndarray
Series ,modeled (nobs by 1)
z : ndarray
Instruments used for endogenous regressors (nobs by ninstr)
params : ndarray
Estimated model parameters (nvar by 1)
debiased : bool
Flag indicating whether to use a small-sample adjustment
kappa : float
Value of kappa in k-class estimator
Notes
-----
Covariance is estimated using
.. math::
n^{-1} V^{-1} \hat{S} V^{-1}
where
.. math::
\hat{S} = n^{-1} \sum_{i=1}^n \hat{\epsilon}_i^2 \hat{x}_i^{\prime} \hat{x}_i
where :math:`\hat{\gamma}=(Z'Z)^{-1}(Z'X)` and
:math:`\hat{x}_i = z_i\hat{\gamma}`. If ``debiased`` is true, then
:math:`S` is scaled by n / (n-k).
.. math::
V = n^{-1} X'Z(Z'Z)^{-1}Z'X
where :math:`X` is the matrix of variables included in the model and
:math:`Z` is the matrix of instruments, including exogenous regressors.
"""
def __init__(
self,
x: Float64Array,
y: Float64Array,
z: Float64Array,
params: Float64Array,
debiased: bool = False,
kappa: Numeric = 1,
):
super().__init__(x, y, z, params, debiased, kappa)
self._name = "Robust Covariance (Heteroskedastic)"
@property
def s(self) -> Float64Array:
"""Heteroskedasticity-robust score covariance estimate"""
x, z, eps = self.x, self.z, self.eps
nobs = x.shape[0]
pinvz = self._pinvz
xhat_e = z @ (pinvz @ x) * eps
s = xhat_e.T @ xhat_e / nobs
return self._scale * s
[docs]
class KernelCovariance(HomoskedasticCovariance):
r"""
Kernel weighted (HAC) covariance estimation
Parameters
----------
x : ndarray
Model regressors (nobs by nvar)
y : ndarray
Series ,modeled (nobs by 1)
z : ndarray
Instruments used for endogenous regressors (nobs by ninstr)
params : ndarray
Estimated model parameters (nvar by 1)
kernel : str
Kernel name. Supported kernels are:
* "bartlett", "newey-west" - Triangular kernel
* "qs", "quadratic-spectral", "andrews" - Quadratic spectral kernel
* "parzen", "gallant" - Parzen's kernel;
bandwidth : {int, None}
Non-negative bandwidth to use with kernel. If None, automatic
bandwidth selection is used.
debiased : bool
Flag indicating whether to use a small-sample adjustment
kappa : float
Value of kappa in k-class estimator
Notes
-----
Covariance is estimated using
.. math::
n^{-1} V^{-1} \hat{S} V^{-1}
where
.. math::
\hat{S}_0 & = n^{-1} \sum_{i=1}^{n} \hat{\epsilon}^2_i \hat{x}_i^{\prime}
\hat{x}_{i} \\
\hat{S}_j & = n^{-1} \sum_{i=1}^{n-j}
\hat{\epsilon}_i\hat{\epsilon}_{i+j} (\hat{x}_i^{\prime}
\hat{x}_{i+j} + \hat{x}_{i+j}^{\prime} \hat{x}_{i}) \\
\hat{S} & = \sum_{i=0}^{bw} K(i, bw) \hat{S}_i
where :math:`\hat{\gamma}=(Z'Z)^{-1}(Z'X)`,
:math:`\hat{x}_i = z_i\hat{\gamma}` and :math:`K(i,bw)` is a weight that
depends on the kernel. If ``debiased`` is true, then :math:`S` is scaled
by n / (n-k).
.. math::
V = n^{-1} X'Z(Z'Z)^{-1}Z'X
where :math:`X` is the matrix of variables included in the model and
:math:`Z` is the matrix of instruments, including exogenous regressors.
See Also
--------
linearmodels.iv.covariance.kernel_weight_bartlett,
linearmodels.iv.covariance.kernel_weight_parzen,
linearmodels.iv.covariance.kernel_weight_quadratic_spectral
"""
def __init__(
self,
x: Float64Array,
y: Float64Array,
z: Float64Array,
params: Float64Array,
kernel: str = "bartlett",
bandwidth: OptionalNumeric = None,
debiased: bool = False,
kappa: Numeric = 1,
):
super().__init__(x, y, z, params, debiased, kappa)
self._kernels = KERNEL_LOOKUP
self._kernel = kernel
self._bandwidth = bandwidth
self._auto_bandwidth = False
self._name = "Kernel Covariance (HAC)"
if kernel not in KERNEL_LOOKUP:
raise ValueError(f"Unknown kernel: {kernel}")
def __str__(self) -> str:
out = super().__str__()
out += f"\nKernel: {self._kernel}"
out += f"\nAutomatic Bandwidth: {self._auto_bandwidth}"
if self._bandwidth:
out += f"\nBandwidth: {self._bandwidth}"
return out
@property
def s(self) -> Float64Array:
"""HAC score covariance estimate"""
x, z, eps = self.x, self.z, self.eps
nobs = x.shape[0]
pinvz = self._pinvz
xhat = z @ (pinvz @ x)
xhat_e = asarray(xhat * eps, dtype=float)
kernel = self.config["kernel"]
bw = self.config["bandwidth"]
if bw is None:
self._auto_bandwidth = True
from linearmodels.shared.linalg import has_constant
const, loc = has_constant(xhat)
sel = ones((xhat.shape[1], 1))
if const and xhat.shape[1] > 1:
sel[loc] = 0
scores = xhat_e @ sel
bw = kernel_optimal_bandwidth(scores, kernel)
self._bandwidth = bw
w = self._kernels[kernel](bw, nobs - 1)
s = cov_kernel(xhat_e, w)
return cast(ndarray, self._scale * s)
@property
def config(self) -> dict[str, Any]:
return {
"debiased": self.debiased,
"bandwidth": self._bandwidth,
"kernel": self._kernel,
"kappa": self._kappa,
}
[docs]
class ClusteredCovariance(HomoskedasticCovariance):
r"""
Covariance estimation for clustered data
Parameters
----------
x : ndarray
Model regressors (nobs by nvar)
y : ndarray
Series ,modeled (nobs by 1)
z : ndarray
Instruments used for endogenous regressors (nobs by ninstr)
params : ndarray
Estimated model parameters (nvar by 1)
debiased : bool
Flag indicating whether to use a small-sample adjustment
clusters : ndarray
Cluster group assignment. If not provided, uses clusters of 1.
Either nobs by ncluster where ncluster is 1 or 2.
kappa : float
Value of kappa in k-class estimator
Notes
-----
Covariance is estimated using
.. math::
n^{-1} V^{-1} \hat{S} V^{-1}
where
.. math::
\hat{S} & = n^{-1} (G/(G-1)) \sum_{g=1}^G \xi_{g}^\prime \xi_{g} \\
\xi_{g} & = \sum_{i\in\mathcal{G}_g} \hat{\epsilon}_i \hat{x}_i \\
where :math:`\hat{\gamma}=(Z'Z)^{-1}(Z'X)` and
:math:`\hat{x}_i = z_i\hat{\gamma}`. :math:`\mathcal{G}_g` contains the
indices of elements in cluster g. If ``debiased`` is true, then
:math:`S` is scaled by g(n - 1) / ((g-1)(n-k)) where g is the number
of groups..
.. math::
V = n^{-1} X'Z(Z'Z)^{-1}Z'X
where :math:`X` is the matrix of variables included in the model and
:math:`Z` is the matrix of instruments, including exogenous regressors.
"""
def __init__(
self,
x: Float64Array,
y: Float64Array,
z: Float64Array,
params: Float64Array,
clusters: AnyArray | None = None,
debiased: bool = False,
kappa: Numeric = 1,
):
super().__init__(x, y, z, params, debiased, kappa)
nobs = x.shape[0]
clusters = arange(nobs) if clusters is None else clusters
clusters = cast(AnyArray, asarray(clusters).squeeze())
if clusters.shape[0] != nobs:
raise ValueError(CLUSTER_ERR.format(nobs, clusters.shape[0]))
self._clusters = clusters
if clusters.ndim == 1:
self._num_clusters = [len(unique(clusters))]
self._num_clusters_str = str(self._num_clusters[0])
else:
self._num_clusters = [
len(unique(clusters[:, 0])),
len(unique(clusters[:, 1])),
]
self._num_clusters_str = ", ".join(map(str, self._num_clusters))
if clusters is not None and clusters.shape[0] != nobs:
raise ValueError(CLUSTER_ERR.format(nobs, clusters.shape[0]))
self._name = "Clustered Covariance (One-Way)"
def __str__(self) -> str:
out = super().__str__()
out += f"\nNum Clusters: {self._num_clusters_str}"
return out
@property
def s(self) -> Float64Array:
"""Clustered estimator of score covariance"""
def rescale(s: Float64Array, nc: int, nobs: int) -> Float64Array:
scale = float(self._scale * (nc / (nc - 1)) * ((nobs - 1) / nobs))
return cast(ndarray, s * scale) if self.debiased else s
x, z, eps = self.x, self.z, self.eps
pinvz = self._pinvz
xhat_e = asarray(z @ (pinvz @ x) * eps, dtype=float)
nobs = x.shape[0]
clusters = self._clusters
if self._clusters.ndim == 1:
s = cov_cluster(xhat_e, clusters)
s = rescale(s, self._num_clusters[0], nobs)
else:
s0 = cov_cluster(xhat_e, clusters[:, 0].squeeze())
s0 = rescale(s0, self._num_clusters[0], nobs)
s1 = cov_cluster(xhat_e, clusters[:, 1].squeeze())
s1 = rescale(s1, self._num_clusters[1], nobs)
c0 = clusters[:, 0] - clusters[:, 0].min() + 1
c1 = clusters[:, 1] - clusters[:, 1].min() + 1
c01 = (c0 * (c1.max() + 1) + c1).astype(int64)
s01 = cov_cluster(xhat_e, c01.squeeze())
nc = len(unique(c01))
s01 = rescale(s01, nc, nobs)
s = s0 + s1 - s01
return s
@property
def config(self) -> dict[str, Any]:
return {
"debiased": self.debiased,
"clusters": self._clusters,
"kappa": self._kappa,
}