"""
Estimators for systems of equations
References
----------
Greene, William H. "Econometric analysis 4th edition." International edition,
New Jersey: Prentice Hall (2000).
StataCorp. 2013. Stata 13 Base Reference Manual. College Station, TX: Stata
Press.
Henningsen, A., & Hamann, J. (2007). systemfit: A Package for Estimating
Systems of Simultaneous Equations in R. Journal of Statistical Software,
23(4), 1 - 40. doi:http://dx.doi.org/10.18637/jss.v023.i04
"""
from __future__ import annotations
from collections.abc import Mapping, Sequence
from functools import reduce
import textwrap
from typing import Any, Literal, cast
import warnings
from formulaic.utils.context import capture_context
import numpy as np
from numpy.linalg import inv, lstsq, matrix_rank, solve
import pandas
from pandas import DataFrame, Index, Series, concat
from linearmodels.iv._utility import IVFormulaParser
from linearmodels.iv.data import IVData
from linearmodels.shared.exceptions import missing_warning
from linearmodels.shared.hypotheses import InvalidTestStatistic, WaldTestStatistic
from linearmodels.shared.linalg import has_constant
from linearmodels.shared.utility import AttrDict
from linearmodels.system._utility import (
LinearConstraint,
blocked_column_product,
blocked_cross_prod,
blocked_diag_product,
blocked_inner_prod,
inv_matrix_sqrt,
)
from linearmodels.system.covariance import (
ClusteredCovariance,
GMMHeteroskedasticCovariance,
GMMHomoskedasticCovariance,
GMMKernelCovariance,
HeteroskedasticCovariance,
HomoskedasticCovariance,
KernelCovariance,
)
from linearmodels.system.gmm import (
HeteroskedasticWeightMatrix,
HomoskedasticWeightMatrix,
KernelWeightMatrix,
)
from linearmodels.system.results import GMMSystemResults, SystemResults
import linearmodels.typing.data
__all__ = ["SUR", "IV3SLS", "IVSystemGMM", "LinearConstraint"]
UNKNOWN_EQ_TYPE = """
Contents of each equation must be either a dictionary with keys "dependent"
and "exog" or a 2-element tuple of he form (dependent, exog).
equations[{key}] was {type}
"""
COV_TYPES = {
"unadjusted": "unadjusted",
"homoskedastic": "unadjusted",
"robust": "robust",
"heteroskedastic": "robust",
"kernel": "kernel",
"hac": "kernel",
"clustered": "clustered",
}
COV_EST = {
"unadjusted": HomoskedasticCovariance,
"robust": HeteroskedasticCovariance,
"kernel": KernelCovariance,
"clustered": ClusteredCovariance,
}
GMM_W_EST = {
"unadjusted": HomoskedasticWeightMatrix,
"robust": HeteroskedasticWeightMatrix,
"kernel": KernelWeightMatrix,
}
GMM_COV_EST = {
"unadjusted": GMMHomoskedasticCovariance,
"robust": GMMHeteroskedasticCovariance,
"kernel": GMMKernelCovariance,
}
def _missing_weights(
weights: Mapping[str, linearmodels.typing.data.ArrayLike | None],
) -> None:
"""Raise warning if missing weighs found"""
missing = [key for key in weights if weights[key] is None]
if missing:
msg = "Weights not found for equation labels:\n{}".format(", ".join(missing))
warnings.warn(msg, UserWarning)
def _parameters_from_xprod(
xpx: linearmodels.typing.data.Float64Array,
xpy: linearmodels.typing.data.Float64Array,
constraints: LinearConstraint | None = None,
) -> linearmodels.typing.data.Float64Array:
r"""
Estimate regression parameters from cross produces
Parameters
----------
xpx : ndarray
Cross product measuring variation in x (nvar by nvar)
xpy : ndarray
Cross produce measuring covariation between x and y (nvar by 1)
constraints : LinearConstraint
Constraints to use in estimation
Returns
-------
params : ndarray
Estimated parameters (nvar by 1)
Notes
-----
xpx and xpy can be any form similar to the two inputs into the usual
parameter estimator for a linear regression. In particular, many
estimators can be written as
.. math::
(x^\prime w x)^{-1}(x^\prime w y)
for some weight matrix :math:`w`.
"""
if constraints is not None:
cons = constraints
xpy = cons.t.T @ xpy - cons.t.T @ xpx @ cons.a.T
xpx = cons.t.T @ xpx @ cons.t
params_c = solve(xpx, xpy)
params = cons.t @ params_c + cons.a.T
else:
params = solve(xpx, xpy).astype(float, copy=False)
return params
class SystemFormulaParser:
def __init__(
self,
formula: Mapping[str, str] | str,
data: pandas.DataFrame,
weights: Mapping[str, linearmodels.typing.data.ArrayLike] | None = None,
eval_env: int = 2,
context: Mapping[str, Any] | None = None,
) -> None:
if not isinstance(formula, (Mapping, str)):
raise TypeError("formula must be a string or dictionary-like")
self._formula: Mapping[str, str] | str = formula
self._data = data
self._weights = weights
self._parsers: dict[str, IVFormulaParser] = {}
self._weight_dict: dict[str, Series | None] = {}
self._eval_env = eval_env
self._clean_formula: dict[str, str] = {}
if context is None:
self._context = capture_context(eval_env)
else:
self._context = context
self._parse()
@staticmethod
def _prevent_autoconst(formula: str) -> str:
if not (" 0+" in formula or " 0 +" in formula):
formula = "~ 0 +".join(formula.split("~"))
return formula
@staticmethod
def _convert_to_series(
value: linearmodels.typing.data.ArrayLike, name: str
) -> Series:
shape = value.shape
if len(shape) > 2 or (len(shape) == 2 and min(shape) != 1):
raise ValueError(f"{name} must be squeezable to 1D.")
if len(shape) == 1:
value_series = Series(value)
else: # len(shape) == 2 and min(shape) == 1:
if isinstance(value, DataFrame):
value_series = value.iloc[:, 0]
else:
value_series = Series(np.squeeze(value))
return value_series
def _parse(self) -> None:
formula = self._formula
data = self._data
weights = self._weights
parsers = self._parsers
weight_dict = self._weight_dict
cln_formula = self._clean_formula
if isinstance(formula, Mapping):
for formula_key in formula:
f = formula[formula_key]
f = self._prevent_autoconst(f)
parsers[formula_key] = IVFormulaParser(f, data, context=self._context)
if weights is not None:
if formula_key in weights:
value = weights[formula_key]
weight_dict[formula_key] = self._convert_to_series(
value, "weights"
)
else:
weight_dict[formula_key] = None
cln_formula[formula_key] = f
else:
formula = formula.replace("\n", " ").strip()
parts = formula.split("}")
for part in parts:
key = base_key = None
part = part.strip()
if part == "":
continue
part = part.replace("{", "")
if ":" in part.split("~")[0]:
base_key, part = part.split(":")
key = base_key = base_key.strip()
part = part.strip()
f = self._prevent_autoconst(part)
if base_key is None:
base_key = key = f.split("~")[0].strip()
count = 0
while key in parsers:
key = base_key + f".{count}"
count += 1
assert isinstance(key, str)
parsers[key] = IVFormulaParser(f, data, context=self._context)
cln_formula[key] = f
if weights is not None:
if key in weights:
value = weights[key]
weight_dict[key] = self._convert_to_series(value, "weights")
else:
weight_dict[key] = None
_missing_weights(weight_dict)
self._weight_dict = weight_dict
def _get_variable(self, variable: str) -> dict[str, DataFrame | None]:
return {key: getattr(self._parsers[key], variable) for key in self._parsers}
@property
def formula(self) -> dict[str, str]:
"""Cleaned version of formula"""
return self._clean_formula
@property
def eval_env(self) -> int:
"""Set or get the eval env depth"""
return self._eval_env
@eval_env.setter
def eval_env(self, value: int) -> None:
self._eval_env = value
self._context = capture_context(self._eval_env)
# Update parsers for new level
parsers = self._parsers
new_parsers = {}
for key in parsers:
parser = parsers[key]
new_parsers[key] = IVFormulaParser(
parser._formula, parser._data, context=self._context
)
self._parsers = new_parsers
@property
def equation_labels(self) -> list[str]:
return list(self._parsers.keys())
@property
def data(self) -> dict[str, dict[str, linearmodels.typing.data.ArrayLike | None]]:
out: dict[str, dict[str, linearmodels.typing.data.ArrayLike | None]] = {}
dep = self.dependent
for key in dep:
out[key] = {"dependent": dep[key]}
exog = self.exog
for key in exog:
out[key]["exog"] = exog[key]
endog = self.endog
for key in endog:
out[key]["endog"] = endog[key]
instr = self.instruments
for key in instr:
out[key]["instruments"] = instr[key]
for key in self._weight_dict:
if self._weight_dict[key] is not None:
out[key]["weights"] = self._weight_dict[key]
return out
@property
def dependent(self) -> dict[str, DataFrame | None]:
return self._get_variable("dependent")
@property
def exog(self) -> dict[str, DataFrame | None]:
return self._get_variable("exog")
@property
def endog(self) -> dict[str, DataFrame | None]:
return self._get_variable("endog")
@property
def instruments(self) -> dict[str, DataFrame | None]:
return self._get_variable("instruments")
class _SystemModelBase:
r"""
Base class for system estimators
Parameters
----------
equations : dict
Dictionary-like structure containing dependent, exogenous, endogenous
and instrumental variables. Each key is an equations label and must
be a string. Each value must be either a tuple of the form (dependent,
exog, endog, instrument[, weights]) or a dictionary with keys "dependent",
and at least one of "exog" or "endog" and "instruments". When using a
tuple, values must be provided for all 4 variables, although either
empty arrays or `None` can be passed if a category of variable is not
included in a model. The dictionary may contain optional keys for
"exog", "endog", "instruments", and "weights". "exog" can be omitted
if all variables in an equation are endogenous. Alternatively, "exog"
can contain either an empty array or `None` to indicate that an
equation contains no exogenous regressors. Similarly "endog" and
"instruments" can either be omitted or may contain an empty array (or
`None`) if all variables in an equation are exogenous.
sigma : array_like
Prespecified residual covariance to use in GLS estimation. If not
provided, FGLS is implemented based on an estimate of sigma.
"""
def __init__(
self,
equations: Mapping[
str,
Mapping[str, linearmodels.typing.data.ArrayLike | None]
| Sequence[linearmodels.typing.data.ArrayLike | None],
],
*,
sigma: linearmodels.typing.data.ArrayLike | None = None,
) -> None:
if not isinstance(equations, Mapping):
raise TypeError("equations must be a dictionary-like")
for key in equations:
if not isinstance(key, str):
raise ValueError("Equation labels (keys) must be strings")
self._equations = equations
self._sigma = None
if sigma is not None:
self._sigma = np.asarray(sigma)
k = len(self._equations)
if self._sigma.shape != (k, k):
raise ValueError(
"sigma must be a square matrix with dimensions "
"equal to the number of equations"
)
self._param_names: list[str] = []
self._eq_labels: list[str] = []
self._dependent: list[IVData] = []
self._exog: list[IVData] = []
self._instr: list[IVData] = []
self._endog: list[IVData] = []
self._y: list[linearmodels.typing.data.Float64Array] = []
self._x: list[linearmodels.typing.data.Float64Array] = []
self._wy: list[linearmodels.typing.data.Float64Array] = []
self._wx: list[linearmodels.typing.data.Float64Array] = []
self._w: list[linearmodels.typing.data.Float64Array] = []
self._z: list[linearmodels.typing.data.Float64Array] = []
self._wz: list[linearmodels.typing.data.Float64Array] = []
self._weights: list[IVData] = []
self._formula: str | dict[str, str] | None = None
self._constraints: LinearConstraint | None = None
self._constant_loc: list[int] = []
self._has_constant: Series = Series(dtype=bool)
self._common_exog = False
self._original_index: Index | None = None
self._model_name = "Three Stage Least Squares (3SLS)"
self._validate_data()
@property
def formula(self) -> str | dict[str, str] | None:
"""Set or get the formula used to construct the model"""
return self._formula
@formula.setter
def formula(self, value: str | dict[str, str] | None) -> None:
self._formula = value
def _validate_data(self) -> None:
ids = []
for i, key in enumerate(self._equations):
self._eq_labels.append(key)
eq_data = self._equations[key]
dep_name = "dependent_" + str(i)
exog_name = "exog_" + str(i)
endog_name = "endog_" + str(i)
instr_name = "instr_" + str(i)
if isinstance(eq_data, (tuple, list)):
dep = IVData(eq_data[0], var_name=dep_name)
self._dependent.append(dep)
current_id: tuple[int, ...] = (id(eq_data[1]),)
self._exog.append(
IVData(eq_data[1], var_name=exog_name, nobs=dep.shape[0])
)
endog = IVData(eq_data[2], var_name=endog_name, nobs=dep.shape[0])
if endog.shape[1] > 0:
current_id += (id(eq_data[2]),)
ids.append(current_id)
self._endog.append(endog)
self._instr.append(
IVData(eq_data[3], var_name=instr_name, nobs=dep.shape[0])
)
if len(eq_data) == 5:
self._weights.append(IVData(eq_data[4]))
else:
dep_shape = self._dependent[-1].shape
self._weights.append(IVData(np.ones(dep_shape)))
elif isinstance(eq_data, (dict, Mapping)):
dep = IVData(eq_data["dependent"], var_name=dep_name)
self._dependent.append(dep)
exog = eq_data.get("exog", None)
self._exog.append(IVData(exog, var_name=exog_name, nobs=dep.shape[0]))
current_id = (id(exog),)
endog_values = eq_data.get("endog", None)
endog = IVData(endog_values, var_name=endog_name, nobs=dep.shape[0])
self._endog.append(endog)
if "endog" in eq_data:
current_id += (id(eq_data["endog"]),)
ids.append(current_id)
instr_values = eq_data.get("instruments", None)
instr = IVData(instr_values, var_name=instr_name, nobs=dep.shape[0])
self._instr.append(instr)
if "weights" in eq_data:
self._weights.append(IVData(eq_data["weights"]))
else:
self._weights.append(IVData(np.ones(dep.shape)))
else:
msg = UNKNOWN_EQ_TYPE.format(key=key, type=type(vars))
raise TypeError(msg)
self._has_instruments = False
for instr in self._instr:
self._has_instruments = self._has_instruments or (instr.shape[1] > 1)
for i, comps in enumerate(
zip(self._dependent, self._exog, self._endog, self._instr, self._weights)
):
shapes = [a.shape[0] for a in comps]
if min(shapes) != max(shapes):
raise ValueError(
"Dependent, exogenous, endogenous and "
"instruments, and weights, if provided, do "
"not have the same number of observations in "
"{eq}".format(eq=self._eq_labels[i])
)
self._drop_missing()
self._common_exog = len(set(ids)) == 1
if self._common_exog:
# Common exog requires weights are also equal
w0 = self._weights[0].ndarray
for w in self._weights:
self._common_exog = self._common_exog and bool(np.all(w.ndarray == w0))
constant = []
constant_loc = []
exog_ivd: IVData
for dep, exog_ivd, endog, instr, w, label in zip(
self._dependent,
self._exog,
self._endog,
self._instr,
self._weights,
self._eq_labels,
):
y = cast(
linearmodels.typing.data.Float64Array,
dep.ndarray,
)
x = np.concatenate([exog_ivd.ndarray, endog.ndarray], 1, dtype=float)
z = np.concatenate([exog_ivd.ndarray, instr.ndarray], 1, dtype=float)
w_arr = cast(
linearmodels.typing.data.Float64Array,
w.ndarray,
)
w_arr = w_arr / np.nanmean(w_arr)
w_sqrt = np.sqrt(w_arr)
self._w.append(w_arr)
self._y.append(y)
self._x.append(x)
self._z.append(z)
self._wy.append(y * w_sqrt)
self._wx.append(x * w_sqrt)
self._wz.append(z * w_sqrt)
cols = list(exog_ivd.cols) + list(endog.cols)
self._param_names.extend([label + "_" + col for col in cols])
if y.shape[0] <= x.shape[1]:
raise ValueError(
"Fewer observations than variables in "
"equation {eq}".format(eq=label)
)
if matrix_rank(x) < x.shape[1]:
raise ValueError(
"Equation {eq} regressor array is not full " "rank".format(eq=label)
)
if x.shape[1] > z.shape[1]:
raise ValueError(
"Equation {eq} has fewer instruments than "
"endogenous variables.".format(eq=label)
)
if z.shape[1] > z.shape[0]:
raise ValueError(
"Fewer observations than instruments in "
"equation {eq}".format(eq=label)
)
if matrix_rank(z) < z.shape[1]:
raise ValueError(
"Equation {eq} instrument array is full " "rank".format(eq=label)
)
for rhs in self._x:
const, const_loc = has_constant(rhs)
constant.append(const)
constant_loc.append(const_loc)
self._has_constant = Series(
constant, index=[d.cols[0] for d in self._dependent]
)
self._constant_loc = constant_loc
def _drop_missing(self) -> None:
k = len(self._dependent)
nobs = self._dependent[0].shape[0]
self._original_index = Index(self._dependent[0].rows)
missing = np.zeros(nobs, dtype=bool)
values = [self._dependent, self._exog, self._endog, self._instr, self._weights]
for i in range(k):
for value in values:
nulls = value[i].isnull
if nulls.any():
missing |= np.asarray(nulls)
missing_warning(missing, stacklevel=4)
if np.any(missing):
for i in range(k):
self._dependent[i].drop(missing)
self._exog[i].drop(missing)
self._endog[i].drop(missing)
self._instr[i].drop(missing)
self._weights[i].drop(missing)
def __repr__(self) -> str:
return self.__str__() + f"\nid: {hex(id(self))}"
def __str__(self) -> str:
out = self._model_name + ", "
out += f"{len(self._y)} Equations:\n"
eqns = ", ".join(self._equations.keys())
out += "\n".join(textwrap.wrap(eqns, 70))
if self._common_exog:
out += "\nCommon Exogenous Variables"
return out
def predict(
self,
params: linearmodels.typing.data.ArrayLike,
*,
equations: (
Mapping[str, Mapping[str, linearmodels.typing.data.ArrayLike]] | None
) = None,
data: pandas.DataFrame | None = None,
eval_env: int = 1,
) -> DataFrame:
"""
Predict values for additional data
Parameters
----------
params : array_like
Model parameters (nvar by 1)
equations : dict
Dictionary-like structure containing exogenous and endogenous
variables. Each key is an equations label and must
match the labels used to fir the model. Each value must be a
dictionary with keys "exog" and "endog". If predictions are not
required for one of more of the model equations, these keys can
be omitted.
data : DataFrame
Values to use when making predictions from a model constructed
from a formula
eval_env : int
Depth to use when evaluating formulas.
Returns
-------
predictions : DataFrame
Fitted values from supplied data and parameters
Notes
-----
If `data` is not none, then `equations` must be none.
Predictions from models constructed using formulas can
be computed using either `equations`, which will treat these are
arrays of values corresponding to the formula-process data, or using
`data` which will be processed using the formula used to construct the
values corresponding to the original model specification.
When using `exog` and `endog`, the regressor array for a particular
equation is assembled as
`[equations[eqn]["exog"], equations[eqn]["endog"]]` where `eqn` is
an equation label. These must correspond to the columns in the
estimated model.
"""
if data is not None:
assert self.formula is not None
parser = SystemFormulaParser(
self.formula, data=data, context=capture_context(eval_env)
)
equations_d = parser.data
else:
if equations is None:
raise ValueError("One of equations or data must be provided.")
assert equations is not None
equations_d = {k: dict(v) for k, v in equations.items()}
params = np.atleast_2d(np.asarray(params))
if params.shape[0] == 1:
params = params.T
nx = int(sum(_x.shape[1] for _x in self._x))
if params.shape[0] != nx:
raise ValueError(
f"Parameters must have {nx} elements; found {params.shape[0]}."
)
loc = 0
out = dict()
for i, label in enumerate(self._eq_labels):
kx = self._x[i].shape[1]
if label in equations_d:
b = params[loc : loc + kx]
eqn = equations_d[label]
exog = eqn.get("exog", None)
endog = eqn.get("endog", None)
if exog is None and endog is None:
loc += kx
continue
if exog is not None:
exog_endog = IVData(exog).pandas
if endog is not None:
endog_ivd = IVData(endog)
exog_endog = concat([exog_endog, endog_ivd.pandas], axis=1)
else:
exog_endog = IVData(endog).pandas
fitted = np.asarray(exog_endog) @ b
fitted_df = DataFrame(fitted, index=exog_endog.index, columns=[label])
out[label] = fitted_df
loc += kx
out_df = reduce(
lambda left, right: left.merge(
right, how="outer", left_index=True, right_index=True
),
[out[key] for key in out],
)
return out_df
def _multivariate_ls_fit(
self,
) -> tuple[
linearmodels.typing.data.Float64Array,
linearmodels.typing.data.Float64Array,
]:
wy, wx, wxhat = self._wy, self._wx, self._wxhat
k = len(wxhat)
xpx = blocked_inner_prod(wxhat, np.eye(len(wxhat)))
_xpy = []
for i in range(k):
_xpy.append(wxhat[i].T @ wy[i])
xpy = np.vstack(_xpy)
beta = _parameters_from_xprod(xpx, xpy, constraints=self.constraints)
loc = 0
eps = []
for i in range(k):
nb = wx[i].shape[1]
b = beta[loc : loc + nb]
eps.append(wy[i] - wx[i] @ b)
loc += nb
eps_arr = np.hstack(eps)
return beta, eps_arr
def _construct_xhat(self) -> None:
k = len(self._x)
self._xhat = []
self._wxhat = []
for i in range(k):
x, z = self._x[i], self._z[i]
if z.shape == x.shape and np.all(z == x):
# OLS, no instruments
self._xhat.append(x)
self._wxhat.append(self._wx[i])
else:
delta = lstsq(z, x, rcond=None)[0]
xhat = z @ delta
self._xhat.append(xhat)
w = self._w[i]
self._wxhat.append(xhat * np.sqrt(w))
def _gls_estimate(
self,
eps: linearmodels.typing.data.Float64Array,
nobs: int,
total_cols: int,
ci: Sequence[int],
full_cov: bool,
debiased: bool,
) -> tuple[
linearmodels.typing.data.Float64Array,
linearmodels.typing.data.Float64Array,
linearmodels.typing.data.Float64Array,
linearmodels.typing.data.Float64Array,
]:
"""Core estimation routine for iterative GLS"""
wy, wx, wxhat = self._wy, self._wx, self._wxhat
if self._sigma is None:
sigma = eps.T @ eps / nobs
sigma *= self._sigma_scale(debiased)
else:
sigma = self._sigma
est_sigma = sigma
if not full_cov:
sigma = np.diag(np.diag(sigma))
sigma_inv = cast(linearmodels.typing.data.Float64Array, inv(sigma))
k = len(wy)
xpx = blocked_inner_prod(wxhat, sigma_inv)
xpy = np.zeros((total_cols, 1))
for i in range(k):
sy = np.zeros((nobs, 1))
for j in range(k):
sy += sigma_inv[i, j] * wy[j]
xpy[ci[i] : ci[i + 1]] = wxhat[i].T @ sy
beta = _parameters_from_xprod(xpx, xpy, constraints=self.constraints)
loc = 0
for j in range(k):
_wx = wx[j]
_wy = wy[j]
kx = _wx.shape[1]
eps[:, [j]] = _wy - _wx @ beta[loc : loc + kx]
loc += kx
return beta, eps, sigma, est_sigma
def _multivariate_ls_finalize(
self,
beta: linearmodels.typing.data.Float64Array,
eps: linearmodels.typing.data.Float64Array,
sigma: linearmodels.typing.data.Float64Array,
cov_type: str,
**cov_config: bool,
) -> SystemResults:
k = len(self._wx)
# Covariance estimation
cov_estimator = COV_EST[cov_type]
cov_est = cov_estimator(
self._wxhat,
eps,
sigma,
sigma,
gls=False,
constraints=self._constraints,
**cov_config,
)
cov = cov_est.cov
individual = AttrDict()
debiased = cov_config.get("debiased", False)
for i in range(k):
wy = wye = self._wy[i]
w = self._w[i]
cons = bool(self.has_constant.iloc[i])
if cons:
wc = np.ones_like(wy) * np.sqrt(w)
wye = wy - wc @ lstsq(wc, wy, rcond=None)[0]
total_ss = float(np.squeeze(wye.T @ wye))
stats = self._common_indiv_results(
i,
beta,
cov,
eps,
eps,
"OLS",
cov_type,
cov_est,
0,
debiased,
cons,
total_ss,
)
key = self._eq_labels[i]
individual[key] = stats
nobs = eps.size
results = self._common_results(
beta, cov, "OLS", 0, nobs, cov_type, sigma, individual, debiased
)
results["wresid"] = results.resid
results["cov_estimator"] = cov_est
results["cov_config"] = cov_est.cov_config
individual = results["individual"]
r2s = [individual[eq].r2 for eq in individual]
results["system_r2"] = self._system_r2(eps, sigma, "ols", False, debiased, r2s)
return SystemResults(results)
@property
def has_constant(self) -> Series:
"""Vector indicating which equations contain constants"""
return self._has_constant
def _f_stat(
self, stats: AttrDict, debiased: bool
) -> WaldTestStatistic | InvalidTestStatistic:
cov = stats.cov
k = cov.shape[0]
sel = list(range(k))
if stats.has_constant:
sel.pop(stats.constant_loc)
cov = cov[sel][:, sel]
params = stats.params[sel]
df = params.shape[0]
nobs = stats.nobs
null = "All parameters ex. constant are zero"
name = "Equation F-statistic"
try:
stat = float(np.squeeze(params.T @ inv(cov) @ params))
except np.linalg.LinAlgError:
return InvalidTestStatistic(
"Covariance is singular, possibly due " "to constraints.", name=name
)
if debiased:
total_reg = np.sum([s.shape[1] for s in self._wx])
df_denom = len(self._wx) * nobs - total_reg
wald = WaldTestStatistic(stat / df, null, df, df_denom=df_denom, name=name)
else:
return WaldTestStatistic(stat, null=null, df=df, name=name)
return wald
def _common_indiv_results(
self,
index: int,
beta: linearmodels.typing.data.Float64Array,
cov: linearmodels.typing.data.Float64Array,
wresid: linearmodels.typing.data.Float64Array,
resid: linearmodels.typing.data.Float64Array,
method: str,
cov_type: str,
cov_est: (
HomoskedasticCovariance
| HeteroskedasticCovariance
| KernelCovariance
| ClusteredCovariance
| GMMHeteroskedasticCovariance
| GMMHomoskedasticCovariance
),
iter_count: int,
debiased: bool,
constant: bool,
total_ss: float,
*,
weight_est: None | (
HomoskedasticWeightMatrix | HeteroskedasticWeightMatrix | KernelWeightMatrix
) = None,
) -> AttrDict:
loc = 0
for i in range(index):
loc += self._wx[i].shape[1]
i = index
stats = AttrDict()
# Static properties
stats["eq_label"] = self._eq_labels[i]
stats["dependent"] = self._dependent[i].cols[0]
stats["instruments"] = (
self._instr[i].cols if self._instr[i].shape[1] > 0 else None
)
stats["endog"] = self._endog[i].cols if self._endog[i].shape[1] > 0 else None
stats["method"] = method
stats["cov_type"] = cov_type
stats["cov_estimator"] = cov_est
stats["cov_config"] = cov_est.cov_config
stats["weight_estimator"] = weight_est
stats["index"] = self._dependent[i].rows
stats["original_index"] = self._original_index
stats["iter"] = iter_count
stats["debiased"] = debiased
stats["has_constant"] = constant
assert self._constant_loc is not None
stats["constant_loc"] = self._constant_loc[i]
# Parameters, errors and measures of fit
wxi = self._wx[i]
nobs, df = wxi.shape
b = beta[loc : loc + df]
e = wresid[:, [i]]
nobs = e.shape[0]
df_c = nobs - int(constant)
df_r = nobs - df
stats["params"] = b
stats["cov"] = cov[loc : loc + df, loc : loc + df]
stats["wresid"] = e
stats["nobs"] = nobs
stats["df_model"] = df
stats["resid"] = resid[:, [i]]
stats["fitted"] = self._x[i] @ b
stats["resid_ss"] = float(np.squeeze(resid[:, [i]].T @ resid[:, [i]]))
stats["total_ss"] = total_ss
stats["r2"] = 1.0 - stats.resid_ss / stats.total_ss
stats["r2a"] = 1.0 - (stats.resid_ss / df_r) / (stats.total_ss / df_c)
names = self._param_names[loc : loc + df]
offset = len(stats.eq_label) + 1
stats["param_names"] = [n[offset:] for n in names]
# F-statistic
stats["f_stat"] = self._f_stat(stats, debiased)
return stats
def _common_results(
self,
beta: linearmodels.typing.data.Float64Array,
cov: linearmodels.typing.data.Float64Array,
method: str,
iter_count: int,
nobs: int,
cov_type: str,
sigma: linearmodels.typing.data.Float64Array,
individual: AttrDict,
debiased: bool,
) -> AttrDict:
results = AttrDict()
results["method"] = method
results["iter"] = iter_count
results["nobs"] = nobs
results["cov_type"] = cov_type
results["index"] = self._dependent[0].rows
results["original_index"] = self._original_index
names = list(individual.keys())
results["sigma"] = DataFrame(sigma, columns=names, index=names)
results["individual"] = individual
results["params"] = beta
results["df_model"] = beta.shape[0]
results["param_names"] = self._param_names
results["cov"] = cov
results["debiased"] = debiased
total_ss = resid_ss = 0.0
residuals = []
for key in individual:
total_ss += individual[key].total_ss
resid_ss += individual[key].resid_ss
residuals.append(individual[key].resid)
resid = np.hstack(residuals)
results["resid_ss"] = resid_ss
results["total_ss"] = total_ss
results["r2"] = 1.0 - results.resid_ss / results.total_ss
results["resid"] = resid
results["constraints"] = self._constraints
results["model"] = self
x = self._x
k = len(x)
loc = 0
fitted_vals = []
for i in range(k):
nb = x[i].shape[1]
b = beta[loc : loc + nb]
fitted_vals.append(x[i] @ b)
loc += nb
fitted = np.hstack(fitted_vals)
results["fitted"] = fitted
return results
def _system_r2(
self,
eps: linearmodels.typing.data.Float64Array,
sigma: linearmodels.typing.data.Float64Array,
method: str,
full_cov: bool,
debiased: bool,
r2s: Sequence[float],
) -> Series:
sigma_resid = sigma
# System regression on a constant using weights if provided
wy, w = self._wy, self._w
wi = [
cast(linearmodels.typing.data.Float64Array, np.sqrt(weights))
for weights in w
]
if method == "ols":
est_sigma = np.eye(len(wy))
else: # gls
est_sigma = sigma
if not full_cov:
est_sigma = np.diag(np.diag(est_sigma))
est_sigma_inv = cast(linearmodels.typing.data.Float64Array, inv(est_sigma))
nobs = wy[0].shape[0]
k = len(wy)
xpx = blocked_inner_prod(wi, est_sigma_inv)
xpy = np.zeros((k, 1))
for i in range(k):
sy = np.zeros((nobs, 1))
for j in range(k):
sy += est_sigma_inv[i, j] * wy[j]
xpy[i : (i + 1)] = wi[i].T @ sy
mu = _parameters_from_xprod(xpx, xpy)
eps_const = np.hstack([self._y[j] - mu[j] for j in range(k)])
# Judge
judge = 1 - (eps**2).sum() / (eps_const**2).sum()
# Dhrymes
tot_eps_const_sq = (eps_const**2).sum(0)
r2s_arr = np.asarray(r2s)
dhrymes = (r2s_arr * tot_eps_const_sq).sum() / tot_eps_const_sq.sum()
# Berndt
sigma_y = (eps_const.T @ eps_const / nobs) * self._sigma_scale(debiased)
berndt = np.nan
# Avoid division by 0
if np.linalg.det(sigma_y) > 0:
berndt = 1 - np.linalg.det(sigma_resid) / np.linalg.det(sigma_y)
mcelroy = np.nan
# Check that the matrix is invertible
if np.linalg.matrix_rank(sigma) == sigma.shape[0]:
# McElroy
sigma_m12 = inv_matrix_sqrt(sigma)
std_eps = eps @ sigma_m12
numerator = (std_eps**2).sum()
std_eps_const = eps_const @ sigma_m12
denom = (std_eps_const**2).sum()
mcelroy = 1.0 - numerator / denom
r2 = dict(mcelroy=mcelroy, berndt=berndt, judge=judge, dhrymes=dhrymes)
return Series(r2)
def _gls_finalize(
self,
beta: linearmodels.typing.data.Float64Array,
sigma: linearmodels.typing.data.Float64Array,
full_sigma: linearmodels.typing.data.Float64Array,
est_sigma: linearmodels.typing.data.Float64Array,
gls_eps: linearmodels.typing.data.Float64Array,
eps: linearmodels.typing.data.Float64Array,
full_cov: bool,
cov_type: str,
iter_count: int,
**cov_config: bool,
) -> SystemResults:
"""Collect results to return after GLS estimation"""
k = len(self._wy)
# Covariance estimation
cov_estimator = COV_EST[cov_type]
gls_eps = np.reshape(gls_eps, (k, gls_eps.shape[0] // k)).T
eps = np.reshape(eps, (k, eps.shape[0] // k)).T
cov_est = cov_estimator(
self._wxhat,
eps,
sigma,
full_sigma,
gls=True,
constraints=self._constraints,
**cov_config,
)
cov = cov_est.cov
# Repackage results for individual equations
individual = AttrDict()
debiased = cov_config.get("debiased", False)
method = "Iterative GLS" if iter_count > 1 else "GLS"
for i in range(k):
cons = bool(self.has_constant.iloc[i])
if cons:
c = np.sqrt(self._w[i])
ye = self._wy[i] - c @ lstsq(c, self._wy[i], rcond=None)[0]
else:
ye = self._wy[i]
total_ss = float(np.squeeze(ye.T @ ye))
stats = self._common_indiv_results(
i,
beta,
cov,
gls_eps,
eps,
method,
cov_type,
cov_est,
iter_count,
debiased,
cons,
total_ss,
)
key = self._eq_labels[i]
individual[key] = stats
# Populate results dictionary
nobs = eps.size
results = self._common_results(
beta,
cov,
method,
iter_count,
nobs,
cov_type,
est_sigma,
individual,
debiased,
)
# wresid is different between GLS and OLS
wresiduals = []
for individual_key in individual:
wresiduals.append(individual[individual_key].wresid)
wresid = np.hstack(wresiduals)
results["wresid"] = wresid
results["cov_estimator"] = cov_est
results["cov_config"] = cov_est.cov_config
individual = results["individual"]
r2s = [individual[eq].r2 for eq in individual]
results["system_r2"] = self._system_r2(
eps, sigma, "gls", full_cov, debiased, r2s
)
return SystemResults(results)
def _sigma_scale(
self, debiased: bool
) -> float | linearmodels.typing.data.Float64Array:
if not debiased:
return 1.0
nobs = float(self._wx[0].shape[0])
scales = np.array([nobs - x.shape[1] for x in self._wx], dtype=np.float64)
scales = cast(linearmodels.typing.data.Float64Array, np.sqrt(nobs / scales))
return scales[:, None] @ scales[None, :]
@property
def constraints(self) -> LinearConstraint | None:
"""
Model constraints
Returns
-------
cons : {LinearConstraint, None}
Constraint object
"""
return self._constraints
def add_constraints(
self, r: pandas.DataFrame, q: pandas.Series | None = None
) -> None:
r"""
Add parameter constraints to a model.
Parameters
----------
r : DataFrame
Constraint matrix. nconstraints by nparameters
q : Series
Constraint values (nconstraints). If not set, set to 0
Notes
-----
Constraints are of the form
.. math ::
r \beta = q
The property `param_names` can be used to determine the order of
parameters.
"""
self._constraints = LinearConstraint(
r, q=q, num_params=len(self._param_names), require_pandas=True
)
def reset_constraints(self) -> None:
"""Remove all model constraints"""
self._constraints = None
@property
def param_names(self) -> list[str]:
"""
Model parameter names
Returns
-------
names : list[str]
Normalized, unique model parameter names
"""
return self._param_names
class _LSSystemModelBase(_SystemModelBase):
"""Base class for least-squares-based system estimators"""
def fit(
self,
*,
method: Literal["ols", "gls", None] = None,
full_cov: bool = True,
iterate: bool = False,
iter_limit: int = 100,
tol: float = 1e-6,
cov_type: str = "robust",
**cov_config: bool,
) -> SystemResults:
"""
Estimate model parameters
Parameters
----------
method : {None, "gls", "ols"}
Estimation method. Default auto selects based on regressors,
using OLS only if all regressors are identical. The other two
arguments force the use of GLS or OLS.
full_cov : bool
Flag indicating whether to utilize information in correlations
when estimating the model with GLS
iterate : bool
Flag indicating to iterate GLS until convergence of iter limit
iterations have been completed
iter_limit : int
Maximum number of iterations for iterative GLS
tol : float
Tolerance to use when checking for convergence in iterative GLS
cov_type : str
Name of covariance estimator. Valid options are
* "unadjusted", "homoskedastic" - Classic covariance estimator
* "robust", "heteroskedastic" - Heteroskedasticity robust
covariance estimator
* "kernel" - Allows for heteroskedasticity and autocorrelation
* "clustered" - Allows for 1 and 2-way clustering of errors
(Rogers).
cov_config
Additional parameters to pass to covariance estimator. All
estimators support debiased which employs a small-sample adjustment
Returns
-------
results : SystemResults
Estimation results
See Also
--------
linearmodels.system.covariance.HomoskedasticCovariance
linearmodels.system.covariance.HeteroskedasticCovariance
linearmodels.system.covariance.KernelCovariance
linearmodels.system.covariance.ClusteredCovariance
"""
if method is None:
method = (
"ols" if (self._common_exog and self._constraints is None) else "gls"
)
else:
if method.lower() not in ("ols", "gls"):
raise ValueError(
f"method must be 'ols' or 'gls' when not None. Got {method}."
)
method = cast(Literal["ols", "gls"], method.lower())
cov_type = cov_type.lower()
if cov_type not in COV_TYPES:
raise ValueError(f"Unknown cov_type: {cov_type}")
cov_type = COV_TYPES[cov_type]
k = len(self._dependent)
col_sizes = [0] + [v.shape[1] for v in self._x]
col_idx = [int(i) for i in np.cumsum(col_sizes)]
total_cols = col_idx[-1]
self._construct_xhat()
beta, eps = self._multivariate_ls_fit()
nobs = eps.shape[0]
debiased = cov_config.get("debiased", False)
full_sigma = sigma = (eps.T @ eps / nobs) * self._sigma_scale(debiased)
if method == "ols":
return self._multivariate_ls_finalize(
beta, eps, sigma, cov_type, **cov_config
)
beta_hist = [beta]
nobs = eps.shape[0]
iter_count = 0
delta = np.inf
while (
(iter_count < iter_limit and iterate) or iter_count == 0
) and delta >= tol:
beta, eps, sigma, est_sigma = self._gls_estimate(
eps, nobs, total_cols, col_idx, full_cov, debiased
)
beta_hist.append(beta)
diff = beta_hist[-1] - beta_hist[-2]
delta = float(np.sqrt(np.mean(diff**2)))
iter_count += 1
sigma_m12 = inv_matrix_sqrt(sigma)
wy = blocked_column_product(self._wy, sigma_m12)
wx = blocked_diag_product(self._wx, sigma_m12)
gls_eps = wy - wx @ beta
y = blocked_column_product(self._y, np.eye(k))
x = blocked_diag_product(self._x, np.eye(k))
eps = y - x @ beta
return self._gls_finalize(
beta,
sigma,
full_sigma,
est_sigma,
gls_eps,
eps,
full_cov,
cov_type,
iter_count,
**cov_config,
)
[docs]
class IV3SLS(_LSSystemModelBase):
r"""
Three-stage Least Squares (3SLS) Estimator
Parameters
----------
equations : dict
Dictionary-like structure containing dependent, exogenous, endogenous
and instrumental variables. Each key is an equations label and must
be a string. Each value must be either a tuple of the form (dependent,
exog, endog, instrument[, weights]) or a dictionary with keys "dependent",
and at least one of "exog" or "endog" and "instruments". When using a
tuple, values must be provided for all 4 variables, although either
empty arrays or `None` can be passed if a category of variable is not
included in a model. The dictionary may contain optional keys for
"exog", "endog", "instruments", and "weights". "exog" can be omitted
if all variables in an equation are endogenous. Alternatively, "exog"
can contain either an empty array or `None` to indicate that an
equation contains no exogenous regressors. Similarly "endog" and
"instruments" can either be omitted or may contain an empty array (or
`None`) if all variables in an equation are exogenous.
sigma : array_like
Prespecified residual covariance to use in GLS estimation. If not
provided, FGLS is implemented based on an estimate of sigma.
Notes
-----
Estimates a set of regressions which are seemingly unrelated in the sense
that separate estimation would lead to consistent parameter estimates.
Each equation is of the form
.. math::
y_{i,k} = x_{i,k}\beta_i + \epsilon_{i,k}
where k denotes the equation and i denoted the observation index. By
stacking vertically arrays of dependent and placing the exogenous
variables into a block diagonal array, the entire system can be compactly
expressed as
.. math::
Y = X\beta + \epsilon
where
.. math::
Y = \left[\begin{array}{x}Y_1 \\ Y_2 \\ \vdots \\ Y_K\end{array}\right]
and
.. math::
X = \left[\begin{array}{cccc}
X_1 & 0 & \ldots & 0 \\
0 & X_2 & \dots & 0 \\
\vdots & \vdots & \ddots & \vdots \\
0 & 0 & \dots & X_K
\end{array}\right]
The system instrumental variable (IV) estimator is
.. math::
\hat{\beta}_{IV} & = (X'Z(Z'Z)^{-1}Z'X)^{-1}X'Z(Z'Z)^{-1}Z'Y \\
& = (\hat{X}'\hat{X})^{-1}\hat{X}'Y
where :math:`\hat{X} = Z(Z'Z)^{-1}Z'X` and. When certain conditions are
satisfied, a GLS estimator of the form
.. math::
\hat{\beta}_{3SLS} = (\hat{X}'\Omega^{-1}\hat{X})^{-1}\hat{X}'\Omega^{-1}Y
can improve accuracy of coefficient estimates where
.. math::
\Omega = \Sigma \otimes I_N
where :math:`\Sigma` is the covariance matrix of the residuals.
"""
def __init__(
self,
equations: Mapping[
str,
Mapping[str, linearmodels.typing.data.ArrayLike | None]
| Sequence[linearmodels.typing.data.ArrayLike | None],
],
*,
sigma: linearmodels.typing.data.ArrayLike | None = None,
) -> None:
super().__init__(equations, sigma=sigma)
[docs]
@classmethod
def multivariate_iv(
cls,
dependent: linearmodels.typing.data.ArrayLike,
exog: linearmodels.typing.data.ArrayLike | None = None,
endog: linearmodels.typing.data.ArrayLike | None = None,
instruments: linearmodels.typing.data.ArrayLike | None = None,
) -> IV3SLS:
"""
Interface for specification of multivariate IV models
Parameters
----------
dependent : array_like
nobs by ndep array of dependent variables
exog : array_like
nobs by nexog array of exogenous regressors common to all models
endog : array_like
nobs by nendog array of endogenous regressors common to all models
instruments : array_like
nobs by ninstr array of instruments to use in all equations
Returns
-------
model : IV3SLS
Model instance
Notes
-----
At least one of exog or endog must be provided.
Utility function to simplify the construction of multivariate IV
models which all use the same regressors and instruments. Constructs
the dictionary of equations from the variables using the common
exogenous, endogenous and instrumental variables.
"""
equations = {}
dependent_ivd = IVData(dependent, var_name="dependent")
if exog is None and endog is None:
raise ValueError("At least one of exog or endog must be provided")
exog_ivd = IVData(exog, var_name="exog")
endog_ivd = IVData(endog, var_name="endog", nobs=dependent.shape[0])
instr_ivd = IVData(instruments, var_name="instruments", nobs=dependent.shape[0])
for col in dependent_ivd.pandas:
equations[str(col)] = {
# TODO: Bug in pandas-stubs
# https://github.com/pandas-dev/pandas-stubs/issues/97
"dependent": dependent_ivd.pandas[[col]],
"exog": exog_ivd.pandas,
"endog": endog_ivd.pandas,
"instruments": instr_ivd.pandas,
}
return cls(equations)
[docs]
class SUR(_LSSystemModelBase):
r"""
Seemingly unrelated regression estimation (SUR/SURE)
Parameters
----------
equations : dict
Dictionary-like structure containing dependent and exogenous variable
values. Each key is an equations label and must be a string. Each
value must be either a tuple of the form (dependent,
exog, [weights]) or a dictionary with keys "dependent" and "exog" and
the optional key "weights".
sigma : array_like
Prespecified residual covariance to use in GLS estimation. If not
provided, FGLS is implemented based on an estimate of sigma.
Notes
-----
Estimates a set of regressions which are seemingly unrelated in the sense
that separate estimation would lead to consistent parameter estimates.
Each equation is of the form
.. math::
y_{i,k} = x_{i,k}\beta_i + \epsilon_{i,k}
where k denotes the equation and i denoted the observation index. By
stacking vertically arrays of dependent and placing the exogenous
variables into a block diagonal array, the entire system can be compactly
expressed as
.. math::
Y = X\beta + \epsilon
where
.. math::
Y = \left[\begin{array}{x}Y_1 \\ Y_2 \\ \vdots \\ Y_K\end{array}\right]
and
.. math::
X = \left[\begin{array}{cccc}
X_1 & 0 & \ldots & 0 \\
0 & X_2 & \dots & 0 \\
\vdots & \vdots & \ddots & \vdots \\
0 & 0 & \dots & X_K
\end{array}\right]
The system OLS estimator is
.. math::
\hat{\beta}_{OLS} = (X'X)^{-1}X'Y
When certain conditions are satisfied, a GLS estimator of the form
.. math::
\hat{\beta}_{GLS} = (X'\Omega^{-1}X)^{-1}X'\Omega^{-1}Y
can improve accuracy of coefficient estimates where
.. math::
\Omega = \Sigma \otimes I_N
where :math:`\Sigma` is the covariance matrix of the residuals.
SUR is a special case of 3SLS where there are no endogenous regressors and
no instruments.
"""
def __init__(
self,
equations: Mapping[
str,
Mapping[str, linearmodels.typing.data.ArrayLike | None]
| Sequence[linearmodels.typing.data.ArrayLike | None],
],
*,
sigma: linearmodels.typing.data.ArrayLike | None = None,
) -> None:
if not isinstance(equations, Mapping):
raise TypeError("equations must be a dictionary-like")
for key in equations:
if not isinstance(key, str):
raise ValueError("Equation labels (keys) must be strings")
reformatted = {}
for key in equations:
eqn = equations[key]
if isinstance(eqn, tuple):
if len(eqn) == 3:
w = eqn[-1]
eqn = eqn[:2]
eqn = eqn + (None, None) + (w,)
else:
eqn = eqn + (None, None)
reformatted[key] = eqn
super().__init__(reformatted, sigma=sigma)
self._model_name = "Seemingly Unrelated Regression (SUR)"
[docs]
@classmethod
def multivariate_ls(
cls,
dependent: linearmodels.typing.data.ArrayLike,
exog: linearmodels.typing.data.ArrayLike,
) -> SUR:
"""
Interface for specification of multivariate regression models
Parameters
----------
dependent : array_like
nobs by ndep array of dependent variables
exog : array_like
nobs by nvar array of exogenous regressors common to all models
Returns
-------
model : SUR
Model instance
Notes
-----
Utility function to simplify the construction of multivariate
regression models which all use the same regressors. Constructs
the dictionary of equations from the variables using the common
exogenous variable.
Examples
--------
A simple CAP-M can be estimated as a multivariate regression
>>> from linearmodels.datasets import french
>>> from linearmodels.system import SUR
>>> data = french.load()
>>> portfolios = data[["S1V1","S1V5","S5V1","S5V5"]]
>>> factors = data[["MktRF"]].copy()
>>> factors["alpha"] = 1
>>> mod = SUR.multivariate_ls(portfolios, factors)
"""
equations = {}
dependent_ivd = IVData(dependent, var_name="dependent")
exog_ivd = IVData(exog, var_name="exog")
for col in dependent_ivd.pandas:
# TODO: Bug in pandas-stubs
# https://github.com/pandas-dev/pandas-stubs/issues/97
equations[str(col)] = (dependent_ivd.pandas[[col]], exog_ivd.pandas)
return cls(equations)
[docs]
class IVSystemGMM(_SystemModelBase):
r"""
System Generalized Method of Moments (GMM) estimation of linear IV models
Parameters
----------
equations : dict
Dictionary-like structure containing dependent, exogenous, endogenous
and instrumental variables. Each key is an equations label and must
be a string. Each value must be either a tuple of the form (dependent,
exog, endog, instrument[, weights]) or a dictionary with keys "dependent",
"exog". The dictionary may contain optional keys for "endog",
"instruments", and "weights". Endogenous and/or Instrument can be empty
if all variables in an equation are exogenous.
sigma : array_like
Prespecified residual covariance to use in GLS estimation. If not
provided, FGLS is implemented based on an estimate of sigma. Only used
if weight_type is "unadjusted"
weight_type : str
Name of moment condition weight function to use in the GMM estimation
weight_config
Additional keyword arguments to pass to the moment condition weight
function
Notes
-----
Estimates a linear model using GMM. Each equation is of the form
.. math::
y_{i,k} = x_{i,k}\beta_i + \epsilon_{i,k}
where k denotes the equation and i denoted the observation index. By
stacking vertically arrays of dependent and placing the exogenous
variables into a block diagonal array, the entire system can be compactly
expressed as
.. math::
Y = X\beta + \epsilon
where
.. math::
Y = \left[\begin{array}{x}Y_1 \\ Y_2 \\ \vdots \\ Y_K\end{array}\right]
and
.. math::
X = \left[\begin{array}{cccc}
X_1 & 0 & \ldots & 0 \\
0 & X_2 & \dots & 0 \\
\vdots & \vdots & \ddots & \vdots \\
0 & 0 & \dots & X_K
\end{array}\right]
The system GMM estimator uses the moment condition
.. math::
z_{ij}(y_{ij} - x_{ij}\beta_j) = 0
where j indexes the equation. The estimator for the coefficients is given
by
.. math::
\hat{\beta}_{GMM} & = (X'ZW^{-1}Z'X)^{-1}X'ZW^{-1}Z'Y \\
where :math:`W` is a positive definite weighting matrix.
"""
def __init__(
self,
equations: Mapping[
str,
Mapping[str, linearmodels.typing.data.ArrayLike | None]
| Sequence[linearmodels.typing.data.ArrayLike | None],
],
*,
sigma: linearmodels.typing.data.ArrayLike | None = None,
weight_type: str = "robust",
**weight_config: bool | str | float,
) -> None:
super().__init__(equations, sigma=sigma)
self._weight_type = weight_type
self._weight_config = weight_config
if weight_type not in COV_TYPES:
raise ValueError("Unknown estimator for weight_type")
if weight_type not in ("unadjusted", "homoskedastic") and sigma is not None:
warnings.warn(
"sigma has been provided but the estimated weight "
"matrix not unadjusted (homoskedastic). sigma will "
"be ignored.",
UserWarning,
)
weight_type = COV_TYPES[weight_type]
self._weight_est = GMM_W_EST[weight_type](**weight_config)
[docs]
def fit(
self,
*,
iter_limit: int = 2,
tol: float = 1e-6,
initial_weight: linearmodels.typing.data.Float64Array | None = None,
cov_type: str = "robust",
**cov_config: bool | float,
) -> GMMSystemResults:
"""
Estimate model parameters
Parameters
----------
iter_limit : int
Maximum number of iterations for iterative GLS
tol : float
Tolerance to use when checking for convergence in iterative GLS
initial_weight : ndarray
Initial weighting matrix to use in the first step. If not
specified, uses the average outer-product of the set containing
the exogenous variables and instruments.
cov_type : str
Name of covariance estimator. Valid options are
* "unadjusted", "homoskedastic" - Classic covariance estimator
* "robust", "heteroskedastic" - Heteroskedasticity robust
covariance estimator
cov_config
Additional parameters to pass to covariance estimator. All
estimators support debiased which employs a small-sample adjustment
Returns
-------
GMMSystemResults
Estimation results
"""
if cov_type not in COV_TYPES:
raise ValueError(f"Unknown cov_type: {cov_type}")
# Parameter estimation
wx, wy, wz = self._wx, self._wy, self._wz
k = len(wx)
nobs = wx[0].shape[0]
k_total = sum(map(lambda a: a.shape[1], wz))
if initial_weight is None:
w = blocked_inner_prod(wz, np.eye(k_total)) / nobs
else:
w = initial_weight
assert w is not None
beta_last = beta = self._blocked_gmm(
wx,
wy,
wz,
w=cast(linearmodels.typing.data.Float64Array, w),
constraints=self.constraints,
)
_eps = []
loc = 0
for i in range(k):
nb = wx[i].shape[1]
b = beta[loc : loc + nb]
_eps.append(wy[i] - wx[i] @ b)
loc += nb
eps = np.hstack(_eps)
sigma = self._weight_est.sigma(eps, wx) if self._sigma is None else self._sigma
vinv = None
iters = 1
norm = 10 * tol + 1
while iters < iter_limit and norm > tol:
sigma = (
self._weight_est.sigma(eps, wx) if self._sigma is None else self._sigma
)
w = self._weight_est.weight_matrix(wx, wz, eps, sigma=sigma)
beta = self._blocked_gmm(
wx,
wy,
wz,
w=cast(linearmodels.typing.data.Float64Array, w),
constraints=self.constraints,
)
delta = beta_last - beta
if vinv is None:
winv = np.linalg.inv(w)
xpz = blocked_cross_prod(wx, wz, np.eye(k))
xpz = cast(linearmodels.typing.data.Float64Array, xpz / nobs)
v = (xpz @ winv @ xpz.T) / nobs
vinv = inv(v)
norm = float(np.squeeze(delta.T @ vinv @ delta))
beta_last = beta
_eps = []
loc = 0
for i in range(k):
nb = wx[i].shape[1]
b = beta[loc : loc + nb]
_eps.append(wy[i] - wx[i] @ b)
loc += nb
eps = np.hstack(_eps)
iters += 1
cov_type = COV_TYPES[cov_type]
cov_est = GMM_COV_EST[cov_type]
cov = cov_est(
wx, wz, eps, w, sigma=sigma, constraints=self._constraints, **cov_config
)
weps = eps
_eps = []
loc = 0
x, y = self._x, self._y
for i in range(k):
nb = x[i].shape[1]
b = beta[loc : loc + nb]
_eps.append(y[i] - x[i] @ b)
loc += nb
eps = np.hstack(_eps)
iters += 1
return self._finalize_results(
beta,
cov.cov,
weps,
eps,
cast(np.ndarray, w),
sigma,
iters - 1,
cov_type,
cov_config,
cov,
)
@staticmethod
def _blocked_gmm(
x: linearmodels.typing.ArraySequence,
y: linearmodels.typing.ArraySequence,
z: linearmodels.typing.ArraySequence,
*,
w: linearmodels.typing.data.Float64Array,
constraints: LinearConstraint | None = None,
) -> linearmodels.typing.data.Float64Array:
k = len(x)
xpz = blocked_cross_prod(x, z, np.eye(k))
wi = np.linalg.inv(w)
xpz_wi_zpx = xpz @ wi @ xpz.T
zpy_arrs = []
for i in range(k):
zpy_arrs.append(z[i].T @ y[i])
zpy = np.vstack(zpy_arrs)
xpz_wi_zpy = xpz @ wi @ zpy
params = _parameters_from_xprod(xpz_wi_zpx, xpz_wi_zpy, constraints=constraints)
return params
def _finalize_results(
self,
beta: linearmodels.typing.data.Float64Array,
cov: linearmodels.typing.data.Float64Array,
weps: linearmodels.typing.data.Float64Array,
eps: linearmodels.typing.data.Float64Array,
wmat: linearmodels.typing.data.Float64Array,
sigma: linearmodels.typing.data.Float64Array,
iter_count: int,
cov_type: str,
cov_config: dict[str, bool | float],
cov_est: GMMHeteroskedasticCovariance | GMMHomoskedasticCovariance,
) -> GMMSystemResults:
"""Collect results to return after GLS estimation"""
k = len(self._wy)
# Repackage results for individual equations
individual = AttrDict()
debiased = bool(cov_config.get("debiased", False))
method = f"{iter_count}-Step System GMM"
if iter_count > 2:
method = "Iterative System GMM"
for i in range(k):
cons = bool(self.has_constant.iloc[i])
if cons:
c = np.sqrt(self._w[i])
ye = self._wy[i] - c @ lstsq(c, self._wy[i], rcond=None)[0]
else:
ye = self._wy[i]
total_ss = float(np.squeeze(ye.T @ ye))
stats = self._common_indiv_results(
i,
beta,
cov,
weps,
eps,
method,
cov_type,
cov_est,
iter_count,
debiased,
cons,
total_ss,
weight_est=self._weight_est,
)
key = self._eq_labels[i]
individual[key] = stats
# Populate results dictionary
nobs = eps.size
results = self._common_results(
beta, cov, method, iter_count, nobs, cov_type, sigma, individual, debiased
)
# wresid is different between GLS and OLS
wresiduals = []
for individual_key in individual:
wresiduals.append(individual[individual_key].wresid)
wresid = np.hstack(wresiduals)
results["wresid"] = wresid
results["wmat"] = wmat
results["weight_type"] = self._weight_type
results["weight_config"] = self._weight_est.config
results["cov_estimator"] = cov_est
results["cov_config"] = cov_est.cov_config
results["weight_estimator"] = self._weight_est
results["j_stat"] = self._j_statistic(beta, wmat)
r2s = [individual[eq].r2 for eq in individual]
results["system_r2"] = self._system_r2(eps, sigma, "gls", False, debiased, r2s)
return GMMSystemResults(results)
def _j_statistic(
self,
params: linearmodels.typing.data.Float64Array,
weight_mat: linearmodels.typing.data.Float64Array,
) -> WaldTestStatistic:
"""
J stat and test
Parameters
----------
params : ndarray
Estimated model parameters
weight_mat : ndarray
Weighting matrix used in estimation of the parameters
Returns
-------
stat : WaldTestStatistic
Test statistic
Notes
-----
Assumes that the efficient weighting matrix has been used. Using
other weighting matrices will not produce the correct test.
"""
y, x, z = self._wy, self._wx, self._wz
k = len(x)
ze_lst = []
idx = 0
for i in range(k):
kx = x[i].shape[1]
beta = params[idx : idx + kx]
eps = y[i] - x[i] @ beta
ze_lst.append(z[i] * eps)
idx += kx
ze = np.concatenate(ze_lst, 1)
g_bar = ze.mean(0)
nobs = x[0].shape[0]
stat = float(nobs * g_bar.T @ np.linalg.inv(weight_mat) @ g_bar.T)
null = "Expected moment conditions are equal to 0"
ninstr = sum(map(lambda a: a.shape[1], z))
nvar = sum(map(lambda a: a.shape[1], x))
ncons = 0 if self.constraints is None else self.constraints.r.shape[0]
return WaldTestStatistic(stat, null, ninstr - (nvar - ncons))