Source code for linearmodels.utility

from linearmodels.compat.pandas import concat
from linearmodels.compat.statsmodels import Summary

from collections.abc import MutableMapping
from typing import (
    AbstractSet,
    Any,
    Dict,
    Iterable,
    Iterator,
    List,
    Mapping,
    Optional,
    Sequence,
    Tuple,
    Type,
    Union,
    ValuesView,
)

import numpy as np
from numpy.linalg import lstsq
from pandas import DataFrame, MultiIndex, Series
from patsy.design_info import DesignInfo
from scipy.stats import chi2, f
from statsmodels.iolib.summary import SimpleTable, fmt_params

from linearmodels.typing import ArrayLike, Label, NDArray, OptionalArrayLike


class MissingValueWarning(Warning):
    pass


missing_value_warning_msg = """
Inputs contain missing values. Dropping rows with missing observations."""


class MemoryWarning(Warning):
    pass


class InferenceUnavailableWarning(Warning):
    pass


class SingletonWarning(Warning):
    pass


[docs]class AttrDict(MutableMapping): """ Ordered dictionary-like object that exposes keys as attributes """
[docs] def update( self, *args: Union[Mapping[Any, Any], Iterable[Tuple[Any, Any]]], **kwargs: Any ) -> None: """ Update AD from dictionary or iterable E and F. If E is present and has a .keys() method, then does: for k in E: AD[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: AD[k] = v In either case, this is followed by: for k in F: AD[k] = F[k] """ self.__private_dict__.update(*args, **kwargs)
[docs] def clear(self) -> None: """Remove all items from AD. """ self.__private_dict__.clear()
[docs] def copy(self) -> "AttrDict": """Create a shallow copy of AD """ ad = AttrDict() for key in self.__private_dict__.keys(): ad[key] = self.__private_dict__[key] return ad
[docs] def keys(self) -> AbstractSet[Any]: """Return an ordered list-like object providing a view on AD's keys """ return self.__private_dict__.keys()
[docs] def items(self) -> AbstractSet[Tuple[Any, Any]]: """Return an ordered list-like object providing a view on AD's items """ return self.__private_dict__.items()
[docs] def values(self) -> ValuesView[Any]: """Return an ordered list-like object object providing a view on AD's values """ return self.__private_dict__.values()
[docs] def pop(self, key: Label, default: Any = None) -> Any: """ Remove specified key and return the corresponding value. If key is not found, default is returned if given, otherwise KeyError is raised """ return self.__private_dict__.pop(key, default)
def __len__(self) -> int: return self.__private_dict__.__len__() def __repr__(self) -> str: out = self.__private_dict__.__str__() return "Attr" + out[7:] def __str__(self) -> str: return self.__repr__() def __init__( self, *args: Union[Mapping[Any, Any], Sequence[Tuple[Any, Any]]], **kwargs: Any ) -> None: self.__dict__["__private_dict__"] = dict(*args, **kwargs) def __contains__(self, item: Label) -> bool: return self.__private_dict__.__contains__(item) def __getitem__(self, item: Label) -> Any: return self.__private_dict__[item] def __setitem__(self, key: Label, value: Any) -> None: if key == "__private_dict__": raise KeyError("__private_dict__ is reserved and cannot be set.") self.__private_dict__[key] = value def __delitem__(self, key: Label) -> None: del self.__private_dict__[key] def __getattr__(self, key: Label) -> Any: if key not in self.__private_dict__: raise AttributeError return self.__private_dict__[key] def __setattr__(self, key: Label, value: Any) -> None: if key == "__private_dict__": raise AttributeError("__private_dict__ is invalid") self.__private_dict__[key] = value def __delattr__(self, key: Label) -> None: del self.__private_dict__[key] def __dir__(self) -> Iterable[str]: out = list(map(str, self.__private_dict__.keys())) out += list(super(AttrDict, self).__dir__()) filtered = [key for key in out if key.isidentifier()] return sorted(set(filtered)) def __iter__(self) -> Iterator[Label]: return self.__private_dict__.__iter__()
def has_constant( x: NDArray, x_rank: Optional[int] = None ) -> Tuple[bool, Optional[int]]: """ Parameters ---------- x: ndarray Array to be checked for a constant (n,k) x_rank : {int, None} Rank of x if previously computed. If None, this value will be computed. Returns ------- const : bool Flag indicating whether x contains a constant or has column span with a constant loc : int Column location of constant """ if np.any(np.all(x == 1, axis=0)): loc = np.argwhere(np.all(x == 1, axis=0)) return True, int(loc) if np.any((np.ptp(x, axis=0) == 0) & ~np.all(x == 0, axis=0)): loc = (np.ptp(x, axis=0) == 0) & ~np.all(x == 0, axis=0) loc = np.argwhere(loc) return True, int(loc) n = x.shape[0] aug_rank = np.linalg.matrix_rank(np.c_[np.ones((n, 1)), x]) rank = np.linalg.matrix_rank(x) if x_rank is None else x_rank has_const = (aug_rank == rank) and x.shape[0] > x.shape[1] has_const = has_const or rank < min(x.shape) loc = None if has_const: out = lstsq(x, np.ones((n, 1)), rcond=None) beta = out[0].ravel() loc = np.argmax(np.abs(beta) * x.var(0)) return bool(has_const), loc def inv_sqrth(x: NDArray) -> NDArray: """ Matrix inverse square root Parameters ---------- x : ndarray Real, symmetric matrix Returns ------- ndarray Input to the power -1/2 """ vals, vecs = np.linalg.eigh(x) return vecs @ np.diag(1 / np.sqrt(vals)) @ vecs.T
[docs]class WaldTestStatistic(object): """ Test statistic holder for Wald-type tests Parameters ---------- stat : float The test statistic null : str A statement of the test's null hypothesis df : int Degree of freedom. df_denom : int, optional Numerator degree of freedom. If provided, uses an F(df, df_denom) distribution. name : str, optional Name of test See Also -------- InvalidTestStatistic """ def __init__( self, stat: float, null: str, df: int, df_denom: Optional[int] = None, name: Optional[str] = None, ) -> None: self._stat = stat self._null = null self.df = df self.df_denom = df_denom self._name = name if df_denom is None: self.dist = chi2(df) self.dist_name = "chi2({0})".format(df) else: self.dist = f(df, df_denom) self.dist_name = "F({0},{1})".format(df, df_denom) @property def stat(self) -> float: """Test statistic""" return self._stat @property def pval(self) -> float: """P-value of test statistic""" return 1 - self.dist.cdf(self.stat) @property def critical_values(self) -> Optional[Dict[str, float]]: """Critical values test for common test sizes""" return dict(zip(["10%", "5%", "1%"], self.dist.ppf([0.9, 0.95, 0.99]))) @property def null(self) -> str: """Null hypothesis""" return self._null def __str__(self) -> str: name = "" if self._name is not None: name = self._name + "\n" msg = ( "{name}H0: {null}\nStatistic: {stat:0.4f}\n" "P-value: {pval:0.4f}\nDistributed: {dist}" ) return msg.format( name=name, null=self.null, stat=self.stat, pval=self.pval, dist=self.dist_name, ) def __repr__(self) -> str: return ( self.__str__() + "\n" + self.__class__.__name__ + ", id: {0}".format(hex(id(self))) )
class InvalidTestWarning(UserWarning): pass
[docs]class InvalidTestStatistic(WaldTestStatistic): """ Class returned if a requested test is not valid for a model Parameters ---------- reason : str Explanation why test is invalid name : str, optional Name of test See Also -------- WaldTestStatistic """ def __init__(self, reason: str, *, name: Optional[str] = None) -> None: self._reason = reason super(InvalidTestStatistic, self).__init__( np.NaN, np.NaN, df=1, df_denom=1, name=name ) self.dist_name = "None" @property def pval(self) -> float: """Always returns np.NaN""" return np.NaN @property def critical_values(self) -> None: """Always returns None""" return None def __str__(self) -> str: msg = "Invalid test statistic\n{reason}\n{name}" name = "" if self._name is None else self._name assert name is not None return msg.format(name=name, reason=self._reason)
class InapplicableTestStatistic(WaldTestStatistic): """ Class returned if a requested test is not applicable for a specification Parameters ---------- reason : str Explanation why test is invalid name : str, optional Name of test See Also -------- WaldTestStatistic """ def __init__(self, *, reason: Optional[str] = None, name: Optional[str] = None): self._reason = reason if reason is None: self._reason = "Test is not applicable to model specification" super(InapplicableTestStatistic, self).__init__( np.NaN, np.NaN, df=1, df_denom=1, name=name ) self.dist_name = "None" @property def pval(self) -> float: """Always returns np.NaN""" return np.NaN @property def critical_values(self) -> None: """Always returns None""" return None def __str__(self) -> str: msg = "Irrelevant test statistic\n{reason}\n{name}" name = "" if self._name is None else self._name return msg.format(name=name, reason=self._reason) def _str(v: float) -> str: """Preferred basic formatter""" if np.isnan(v): return " " av = abs(v) digits = 0 if av != 0: digits = np.ceil(np.log10(av)) if digits > 4 or digits <= -4: return "{0:8.4g}".format(v) if digits > 0: d = int(5 - digits) else: d = int(4) format_str = "{0:" + "0.{0}f".format(d) + "}" return format_str.format(v) def pval_format(v: float) -> str: """Preferred formatting for x in [0,1]""" if np.isnan(v): return " " return "{0:4.4f}".format(v) class _SummaryStr(object): """ Mixin class for results classes to automatically show the summary. """ @property def summary(self) -> Summary: return Summary() def __str__(self) -> str: return self.summary.as_text() def __repr__(self) -> str: return ( self.__str__() + "\n" + self.__class__.__name__ + ", id: {0}".format(hex(id(self))) ) def _repr_html_(self) -> str: return self.summary.as_html() + "<br/>id: {0}".format(hex(id(self))) def ensure_unique_column(col_name: str, df: DataFrame, addition: str = "_") -> str: while col_name in df: col_name = addition + col_name + addition return col_name class _ModelComparison(_SummaryStr): """ Base class for model comparisons """ _supported: Tuple[Type, ...] = tuple() _PRECISION_TYPES = { "tstats": "T-stats", "pvalues": "P-values", "std_errors": "Std. Errors", } # TODO: Replace Any with better list of types def __init__( self, results: Union[Dict[str, Any], Sequence[Any]], *, precision: str = "tstats", ) -> None: if not isinstance(results, dict): _results: Dict[str, Any] = {} for i, res in enumerate(results): _results["Model " + str(i)] = res else: _results = {} _results.update(results) self._results = _results for key in self._results: if not isinstance(self._results[key], self._supported): raise TypeError("Results from unknown model") precision = precision.lower().replace("-", "_") if precision not in ("tstats", "pvalues", "std_errors"): raise ValueError( "Unknown precision value. Must be one of 'tstats', 'std_errors' " "or 'pvalues'." ) self._precision = precision def _get_series_property(self, name: str) -> DataFrame: out: List[Tuple[str, Series]] = [ (k, getattr(v, name)) for k, v in self._results.items() ] cols = [v[0] for v in out] values = concat([v[1] for v in out], axis=1) values.columns = cols return values def _get_property(self, name: str) -> Series: out = {} items = [] for k, v in self._results.items(): items.append(k) out[k] = getattr(v, name) return Series(out, name=name).loc[items] @property def nobs(self) -> Series: """Parameters for all models""" return self._get_property("nobs") @property def params(self) -> DataFrame: """Parameters for all models""" return self._get_series_property("params") @property def tstats(self) -> DataFrame: """Parameter t-stats for all models""" return self._get_series_property("tstats") @property def std_errors(self) -> DataFrame: """Parameter standard errors for all models""" return self._get_series_property("std_errors") @property def pvalues(self) -> DataFrame: """Parameter p-vals for all models""" return self._get_series_property("pvalues") @property def rsquared(self) -> Series: """Coefficients of determination (R**2)""" return self._get_property("rsquared") @property def f_statistic(self) -> DataFrame: """F-statistics and P-values""" out = self._get_property("f_statistic") out_df = DataFrame( np.empty((len(out), 2)), columns=["F stat", "P-value"], index=out.index ) for loc in out.index: out_df.loc[loc] = out[loc].stat, out[loc].pval return out_df def missing_warning(missing: ArrayLike) -> None: """Utility function to perform missing value check and warning""" if not np.any(missing): return import linearmodels if linearmodels.WARN_ON_MISSING: import warnings warnings.warn(missing_value_warning_msg, MissingValueWarning) # TODO: typing for Any def param_table(results: Any, title: str, pad_bottom: bool = False) -> SimpleTable: """Formatted standard parameter table""" param_data = np.c_[ np.asarray(results.params)[:, None], np.asarray(results.std_errors)[:, None], np.asarray(results.tstats)[:, None], np.asarray(results.pvalues)[:, None], results.conf_int(), ] data = [] for row in param_data: txt_row = [] for i, v in enumerate(row): func = _str if i == 3: func = pval_format txt_row.append(func(v)) data.append(txt_row) header = ["Parameter", "Std. Err.", "T-stat", "P-value", "Lower CI", "Upper CI"] table_stubs = list(results.params.index) if pad_bottom: # Append blank row for spacing data.append([""] * 6) table_stubs += [""] return SimpleTable( data, stubs=table_stubs, txt_fmt=fmt_params, headers=header, title=title ) def format_wide(s: Sequence[str], cols: int) -> List[List[str]]: """ Format a list of strings. Parameters ---------- s : List[str] List of strings to format cols : int Number of columns in output Returns ------- List[List[str]] The joined list. """ lines = [] line = "" for i, val in enumerate(s): if line == "": line = val if i + 1 != len(s): line += ", " else: temp = line + val if i + 1 != len(s): temp += ", " if len(temp) > cols: lines.append([line]) line = val if i + 1 != len(s): line += ", " else: line = temp lines.append([line]) return lines def panel_to_frame( x: NDArray, items: Sequence[Label], major_axis: Sequence[Label], minor_axis: Sequence[Label], swap: bool = False, ) -> DataFrame: """ Construct a multiindex DataFrame using Panel-like arguments Parameters ---------- x : ndarray 3-d array with size nite, nmajor, nminor items : list-like List like object with item labels major_axis : list-like List like object with major_axis labels minor_axis : list-like List like object with minor_axis labels swap : bool Swap is major and minor axes Notes ----- This function is equivalent to Panel(x, items, major_axis, minor_axis).to_frame() if `swap` is True, it is equivalent to Panel(x, items, major_axis, minor_axis).swapaxes(1,2).to_frame() """ nmajor = np.arange(len(major_axis)) nminor = np.arange(len(minor_axis)) final_levels = [major_axis, minor_axis] mi = MultiIndex.from_product([nmajor, nminor]) if x is not None: shape = x.shape x = x.reshape((shape[0], shape[1] * shape[2])).T df = DataFrame(x, columns=items, index=mi) if swap: df.index = mi.swaplevel() df.sort_index(inplace=True) final_levels = [minor_axis, major_axis] df.index.set_levels(final_levels, [0, 1], inplace=True) df.index.names = ["major", "minor"] return df def quadratic_form_test( params: ArrayLike, cov: ArrayLike, restriction: OptionalArrayLike = None, value: OptionalArrayLike = None, formula: Optional[Union[str, List[str]]] = None, ) -> WaldTestStatistic: if formula is not None and restriction is not None: raise ValueError("restriction and formula cannot be used" "simultaneously.") if formula is not None: di = DesignInfo(list(params.index)) lc = di.linear_constraint(formula) restriction, value = lc.coefs, lc.constants restriction = np.asarray(restriction) if value is None: value = np.zeros(restriction.shape[0]) value = np.asarray(value).ravel()[:, None] diff = restriction @ np.asarray(params)[:, None] - value rcov = restriction @ cov @ restriction.T stat = float(diff.T @ np.linalg.inv(rcov) @ diff) df = restriction.shape[0] null = "Linear equality constraint is valid" name = "Linear Equality Hypothesis Test" return WaldTestStatistic(stat, null, df, name=name) def get_string(d: Mapping[str, Any], key: str) -> Optional[str]: """ Helper function that gets a string or None Parameters ---------- d : Mapping[str, Any] A mapping. key : str The key to lookup. Returns ------- {str, None} The string or None if the key is not in the dictionary. If in the dictionary, a type check is performed and TypeError is raised if not found. """ out: Optional[str] = None if key in d: out = d[key] if out is not None: if isinstance(out, str): return out else: raise TypeError(f"{key} found in the dictionary but it is not a str.") return out def get_float(d: Mapping[str, Any], key: str) -> Optional[float]: """ Helper function that gets a float or None Parameters ---------- d : Mapping[str, Any] A mapping. key : str The key to lookup. Returns ------- {float, None} The string or None if the key is not in the dictionary. If in the dictionary, a type check is performed and TypeError is raised if not found. """ out: Optional[float] = None if key in d: out = d[key] if out is not None: if isinstance(out, (int, float, np.floating)): return float(out) else: raise TypeError(f"{key} found in the dictionary but it is not a float.") return out def get_bool(d: Mapping[str, Any], key: str) -> bool: """ Helper function that gets a bool, defaulting to False. Parameters ---------- d : Mapping[str, Any] A mapping. key : str The key to lookup. Returns ------- bool The boolean if the key is in the dictionary. If not found, returns False. """ out: Optional[bool] = False if key in d: out = d[key] if not (out is None or isinstance(out, bool)): raise TypeError(f"{key} found in the dictionary but it is not a bool.") return bool(out) def get_array_like(d: Mapping[str, Any], key: str) -> Optional[ArrayLike]: """ Helper function that gets a bool or None Parameters ---------- d : Mapping[str, Any] A mapping. key : str The key to lookup. Returns ------- {bool, None} The string or None if the key is not in the dictionary. If in the dictionary, a type check is performed and TypeError is raised if not found. """ out: Optional[bool] = None if key in d: out = d[key] if out is not None: array_like: Union[Any] = (np.ndarray, DataFrame, Series) try: import xarray as xr array_like += (xr.DataArray,) except ImportError: pass if isinstance(out, array_like): return out else: raise TypeError( f"{key} found in the dictionary but it is not array-like." ) return out