Source code for arch.unitroot.unitroot

from __future__ import annotations

from abc import ABCMeta, abstractmethod
from collections.abc import Sequence
from typing import cast
import warnings

from numpy import (
    abs,
    amax,
    amin,
    any as npany,
    arange,
    argwhere,
    array,
    asarray,
    ceil,
    cumsum,
    diag,
    diff,
    empty,
    float64,
    full,
    hstack,
    inf,
    interp,
    isnan,
    log,
    nan,
    ndarray,
    ones,
    pi,
    polyval,
    power,
    sort,
    sqrt,
    squeeze,
    sum as npsum,
)
from numpy.linalg import LinAlgError, inv, lstsq, matrix_rank, pinv, qr, solve
from pandas import DataFrame
from scipy.stats import norm
from statsmodels.iolib.summary import Summary
from statsmodels.iolib.table import SimpleTable
from statsmodels.regression.linear_model import OLS, RegressionResults
from statsmodels.tsa.tsatools import lagmat

from arch.typing import (
    ArrayLike,
    ArrayLike1D,
    ArrayLike2D,
    Float64Array,
    Literal,
    UnitRootTrend,
)
from arch.unitroot.critical_values.dfgls import (
    dfgls_cv_approx,
    dfgls_large_p,
    dfgls_small_p,
    dfgls_tau_max,
    dfgls_tau_min,
    dfgls_tau_star,
)
from arch.unitroot.critical_values.dickey_fuller import (
    adf_z_cv_approx,
    adf_z_large_p,
    adf_z_max,
    adf_z_min,
    adf_z_small_p,
    adf_z_star,
    tau_2010,
    tau_large_p,
    tau_max,
    tau_min,
    tau_small_p,
    tau_star,
)
from arch.unitroot.critical_values.kpss import kpss_critical_values
from arch.unitroot.critical_values.zivot_andrews import za_critical_values
from arch.utility import cov_nw
from arch.utility.array import AbstractDocStringInheritor, ensure1d, ensure2d
from arch.utility.exceptions import (
    InfeasibleTestException,
    InvalidLengthWarning,
    PerformanceWarning,
    invalid_length_doc,
)
from arch.utility.timeseries import add_trend

__all__ = [
    "ADF",
    "DFGLS",
    "PhillipsPerron",
    "KPSS",
    "VarianceRatio",
    "kpss_crit",
    "mackinnoncrit",
    "mackinnonp",
    "ZivotAndrews",
    "auto_bandwidth",
    "TREND_DESCRIPTION",
    "SHORT_TREND_DESCRIPTION",
]

TREND_MAP = {None: "n", 0: "c", 1: "ct", 2: "ctt"}

TREND_DESCRIPTION = {
    "n": "No Trend",
    "c": "Constant",
    "ct": "Constant and Linear Time Trend",
    "ctt": "Constant, Linear and Quadratic Time Trends",
    "t": "Linear Time Trend (No Constant)",
}

SHORT_TREND_DESCRIPTION = {
    "n": "No Trend",
    "c": "Constant",
    "ct": "Const and Linear Trend",
    "ctt": "Const., Lin. and Quad. Trends",
    "t": "Linear Time Trend (No Const.)",
}


def _is_reduced_rank(x: Float64Array | DataFrame) -> tuple[bool, int | None]:
    """
    Check if a matrix has reduced rank preferring quick checks
    """
    if x.shape[1] > x.shape[0]:
        return True, None
    elif npany(isnan(x)):
        return True, None
    elif sum(amax(x, axis=0) == amin(x, axis=0)) > 1:
        return True, None
    else:
        x_rank = matrix_rank(x)
        return x_rank < x.shape[1], x_rank


def _select_best_ic(
    method: Literal["aic", "bic", "t-stat"],
    nobs: float,
    sigma2: Float64Array,
    tstat: Float64Array,
) -> tuple[float, int]:
    """
    Computes the best information criteria

    Parameters
    ----------
    method : {"aic", "bic", "t-stat"}
        Method to use when finding the lag length
    nobs : float
        Number of observations in time series
    sigma2 : ndarray
        maxlag + 1 array containing MLE estimates of the residual variance
    tstat : ndarray
        maxlag + 1 array containing t-statistic values. Only used if method
        is "t-stat"

    Returns
    -------
    icbest : float
        Minimum value of the information criteria
    lag : int
        The lag length that maximizes the information criterion.
    """
    llf = -nobs / 2.0 * (log(2 * pi) + log(sigma2) + 1)
    maxlag = len(sigma2) - 1
    if method == "aic":
        crit = -2 * llf + 2 * arange(float(maxlag + 1))
        icbest, lag = min(zip(crit, arange(maxlag + 1)))
    elif method == "bic":
        crit = -2 * llf + log(nobs) * arange(float(maxlag + 1))
        icbest, lag = min(zip(crit, arange(maxlag + 1)))
    elif method == "t-stat":
        stop = 1.6448536269514722
        large_tstat = abs(tstat) >= stop
        lag = int(squeeze(max(argwhere(large_tstat))))
        icbest = float(tstat[lag])
    else:
        raise ValueError("Unknown method")

    return icbest, lag


singular_array_error: str = """\
The maximum lag you are considering ({max_lags}) results in an ADF regression with a
singular regressor matrix after including {lag} lags, and so a specification test be
run. This may occur if your series have little variation and so is locally constant,
or may occur if you are attempting to test a very short series. You can manually set
maximum lag length to consider smaller models.\
"""


def _autolag_ols_low_memory(
    y: Float64Array,
    maxlag: int,
    trend: UnitRootTrend,
    method: Literal["aic", "bic", "t-stat"],
) -> tuple[float, int]:
    """
    Computes the lag length that minimizes an info criterion .

    Parameters
    ----------
    y : ndarray
        Variable being tested for a unit root
    maxlag : int
        The highest lag order for lag length selection.
    trend : {"n", "c", "ct", "ctt"}
        Trend in the model
    method : {"aic", "bic", "t-stat"}
        Method to use when finding the lag length

    Returns
    -------
    icbest : float
        Minimum value of the information criteria
    lag : int
        The lag length that maximizes the information criterion.

    Notes
    -----
    Minimizes creation of large arrays. Uses approx 6 * nobs temporary values
    """
    y = asarray(y)
    lower_method = method.lower()
    deltay = diff(y)
    deltay = deltay / sqrt(deltay @ deltay)
    lhs = deltay[maxlag:][:, None]
    level = y[maxlag:-1]
    level = level / sqrt(level @ level)
    trendx: list[Float64Array] = []
    nobs = lhs.shape[0]
    if trend == "n":
        trendx.append(empty((nobs, 0)))
    else:
        if "tt" in trend:
            tt = arange(1, nobs + 1, dtype=float64)[:, None] ** 2
            tt *= sqrt(5) / float(nobs) ** (5 / 2)
            trendx.append(tt)
        if "t" in trend:
            t = arange(1, nobs + 1, dtype=float64)[:, None]
            t *= sqrt(3) / float(nobs) ** (3 / 2)
            trendx.append(t)
        if trend.startswith("c"):
            trendx.append(ones((nobs, 1)) / sqrt(nobs))
    rhs = hstack([level[:, None], hstack(trendx)])
    m = rhs.shape[1]
    xpx = full((m + maxlag, m + maxlag), nan)
    xpy = full((m + maxlag, 1), nan)
    assert isinstance(xpx, ndarray)
    assert isinstance(xpy, ndarray)
    xpy[:m] = rhs.T @ lhs
    xpx[:m, :m] = rhs.T @ rhs
    for i in range(maxlag):
        x1 = deltay[maxlag - i - 1 : -(1 + i)]
        block = rhs.T @ x1
        xpx[m + i, :m] = block
        xpx[:m, m + i] = block
        xpy[m + i] = x1 @ lhs
        for j in range(i, maxlag):
            x2 = deltay[maxlag - j - 1 : -(1 + j)]
            x1px2 = x1 @ x2
            xpx[m + i, m + j] = x1px2
            xpx[m + j, m + i] = x1px2
    ypy = lhs.T @ lhs
    sigma2 = empty(maxlag + 1)

    tstat = empty(maxlag + 1)
    tstat[0] = inf
    for i in range(m, m + maxlag + 1):
        xpx_sub = xpx[:i, :i]
        try:
            b = solve(xpx_sub, xpy[:i])
        except LinAlgError:
            raise InfeasibleTestException(
                singular_array_error.format(max_lags=maxlag, lag=m - i)
            )
        sigma2[i - m] = squeeze(ypy - b.T @ xpx_sub @ b) / nobs
        if lower_method == "t-stat":
            xpxi = inv(xpx_sub)
            stderr = sqrt(sigma2[i - m] * xpxi[-1, -1])
            tstat[i - m] = squeeze(b[-1]) / stderr

    return _select_best_ic(method, nobs, sigma2, tstat)


def _autolag_ols(
    endog: ArrayLike1D,
    exog: ArrayLike2D,
    startlag: int,
    maxlag: int,
    method: Literal["aic", "bic", "t-stat"],
) -> tuple[float, int]:
    """
    Returns the results for the lag length that maximizes the info criterion.

    Parameters
    ----------
    endog : {ndarray, Series}
        nobs array containing endogenous variable
    exog : {ndarray, DataFrame}
        nobs by (startlag + maxlag) array containing lags and possibly other
        variables
    startlag : int
        The first zero-indexed column to hold a lag.  See Notes.
    maxlag : int
        The highest lag order for lag length selection.
    method : {"aic", "bic", "t-stat"}

        * aic - Akaike Information Criterion
        * bic - Bayes Information Criterion
        * t-stat - Based on last lag

    Returns
    -------
    icbest : float
        Minimum value of the information criteria
    lag : int
        The lag length that maximizes the information criterion.

    Notes
    -----
    Does estimation like mod(endog, exog[:,:i]).fit()
    where i goes from lagstart to lagstart + maxlag + 1.  Therefore, lags are
    assumed to be in contiguous columns from low to high lag length with
    the highest lag in the last column.
    """
    lower_method = method.lower()
    exog_singular, exog_rank = _is_reduced_rank(exog)
    if exog_singular:
        if exog_rank is None:
            exog_rank = matrix_rank(exog)
        raise InfeasibleTestException(
            singular_array_error.format(
                max_lags=maxlag, lag=max(exog_rank - startlag, 0)
            )
        )
    q, r = qr(exog)
    qpy = q.T @ endog
    ypy = endog.T @ endog
    xpx = exog.T @ exog

    sigma2 = empty(maxlag + 1)
    tstat = empty(maxlag + 1)
    nobs = float(endog.shape[0])
    tstat[0] = inf
    for i in range(startlag, startlag + maxlag + 1):
        b = solve(r[:i, :i], qpy[:i])
        sigma2[i - startlag] = squeeze(ypy - b.T @ xpx[:i, :i] @ b) / nobs
        if lower_method == "t-stat" and i > startlag:
            xpxi = inv(xpx[:i, :i])
            stderr = sqrt(sigma2[i - startlag] * xpxi[-1, -1])
            tstat[i - startlag] = squeeze(b[-1]) / stderr

    return _select_best_ic(method, nobs, sigma2, tstat)


def _df_select_lags(
    y: Float64Array,
    trend: Literal["n", "c", "ct", "ctt"],
    max_lags: int | None,
    method: Literal["aic", "bic", "t-stat"],
    low_memory: bool = False,
) -> tuple[float, int]:
    """
    Helper method to determine the best lag length in DF-like regressions

    Parameters
    ----------
    y : ndarray
        The data for the lag selection exercise
    trend : {"n","c","ct","ctt"}
        The trend order
    max_lags : int
        The maximum number of lags to check.  This setting affects all
        estimation since the sample is adjusted by max_lags when
        fitting the models
    method : {"aic", "bic", "t-stat"}
        The method to use when estimating the model
    low_memory : bool
        Flag indicating whether to use the low-memory algorithm for
        lag-length selection.

    Returns
    -------
    best_ic : float
        The information criteria at the selected lag
    best_lag : int
        The selected lag

    Notes
    -----
    If max_lags is None, the default value of 12 * (nobs/100)**(1/4) is used.
    """
    nobs = y.shape[0]
    # This is the absolute maximum number of lags possible,
    # only needed to very short time series.
    max_max_lags = max((nobs - 1) // 2 - 1, 0)
    if trend != "n":
        max_max_lags -= len(trend)
    if max_lags is None:
        max_lags = int(ceil(12.0 * power(nobs / 100.0, 1 / 4.0)))
        max_lags = max(min(max_lags, max_max_lags), 0)
        if max_lags > 119:
            warnings.warn(
                "The value of max_lags was not specified and has been calculated as "
                f"{max_lags}. Searching over a large lag length with a sample size "
                f"of {nobs} is likely to be slow. Consider directly setting "
                "``max_lags`` to a small value to avoid this performance issue.",
                PerformanceWarning,
            )
    assert max_lags is not None
    if low_memory:
        out = _autolag_ols_low_memory(y, max_lags, trend, method)
        return out
    delta_y = diff(y)
    rhs = lagmat(delta_y[:, None], max_lags, trim="both", original="in")
    nobs = rhs.shape[0]
    rhs[:, 0] = y[-nobs - 1 : -1]  # replace 0 with level of y
    lhs = delta_y[-nobs:]

    if trend != "n":
        full_rhs = add_trend(rhs, trend, prepend=True)
    else:
        full_rhs = rhs

    start_lag = full_rhs.shape[1] - rhs.shape[1] + 1
    ic_best, best_lag = _autolag_ols(lhs, full_rhs, start_lag, max_lags, method)
    return ic_best, best_lag


def _add_column_names(rhs: Float64Array, lags: int) -> DataFrame:
    """Return a DataFrame with named columns"""
    lag_names = [f"Diff.L{i}" for i in range(1, lags + 1)]
    return DataFrame(rhs, columns=["Level.L1"] + lag_names)


def _estimate_df_regression(
    y: Float64Array, trend: Literal["n", "c", "ct", "ctt"], lags: int
) -> RegressionResults:
    """Helper function that estimates the core (A)DF regression

    Parameters
    ----------
    y : ndarray
        The data for the lag selection
    trend : {"n","c","ct","ctt"}
        The trend order
    lags : int
        The number of lags to include in the ADF regression

    Returns
    -------
    ols_res : OLSResults
        A results class object produced by OLS.fit()

    Notes
    -----
    See statsmodels.regression.linear_model.OLS for details on the results
    returned
    """
    delta_y = diff(y)

    rhs = lagmat(delta_y[:, None], lags, trim="both", original="in")
    nobs = rhs.shape[0]
    lhs = rhs[:, 0].copy()  # lag-0 values are lhs, Is copy() necessary?
    rhs[:, 0] = y[-nobs - 1 : -1]  # replace lag 0 with level of y
    rhs = _add_column_names(rhs, lags)

    if trend != "n":
        rhs = add_trend(rhs.iloc[:, : lags + 1], trend)

    return OLS(lhs, rhs).fit()


class UnitRootTest(metaclass=ABCMeta):
    """Base class to be used for inheritance in unit root bootstrap"""

    def __init__(
        self,
        y: ArrayLike,
        lags: int | None,
        trend: UnitRootTrend | Literal["t"],
        valid_trends: list[str] | tuple[str, ...],
    ) -> None:
        self._y = ensure1d(y, "y", series=False)
        self._delta_y = diff(y)
        self._nobs = self._y.shape[0]
        self._lags = int(lags) if lags is not None else lags
        if self._lags is not None and self._lags < 0:
            raise ValueError("lags must be non-negative.")
        self._valid_trends = list(valid_trends)
        if trend not in self.valid_trends:
            raise ValueError("trend not understood")
        self._trend = trend
        self._stat: float | None = None
        self._critical_values: dict[str, float] = {}
        self._pvalue: float | None = None
        self._null_hypothesis = "The process contains a unit root."
        self._alternative_hypothesis = "The process is weakly stationary."
        self._test_name = ""
        self._title = ""
        self._summary_text: list[str] = []

    def __str__(self) -> str:
        return self.summary().__str__()

    def __repr__(self) -> str:
        return str(type(self)) + '\n"""\n' + self.__str__() + '\n"""'

    def _repr_html_(self) -> str:
        """Display as HTML for IPython notebook."""
        return self.summary().as_html()

    @abstractmethod
    def _check_specification(self) -> None:
        """
        Check that the data are compatible with running a test.
        """

    @abstractmethod
    def _compute_statistic(self) -> None:
        """This is the core routine that computes the test statistic, computes
        the p-value and constructs the critical values.
        """

    def _compute_if_needed(self) -> None:
        """Checks whether the statistic needs to be computed, and computed if
        needed
        """
        if self._stat is None:
            self._check_specification()
            self._compute_statistic()

    @property
    def null_hypothesis(self) -> str:
        """The null hypothesis"""
        return self._null_hypothesis

    @property
    def alternative_hypothesis(self) -> str:
        """The alternative hypothesis"""
        return self._alternative_hypothesis

    @property
    def nobs(self) -> int:
        """The number of observations used when computing the test statistic.
        Accounts for loss of data due to lags for regression-based bootstrap."""
        return self._nobs

    @property
    def valid_trends(self) -> list[str]:
        """List of valid trend terms."""
        return self._valid_trends

    @property
    def pvalue(self) -> float:
        """Returns the p-value for the test statistic"""
        self._compute_if_needed()
        assert self._pvalue is not None
        return self._pvalue

    @property
    def stat(self) -> float:
        """The test statistic for a unit root"""
        self._compute_if_needed()
        assert self._stat is not None
        return self._stat

    @property
    def critical_values(self) -> dict[str, float]:
        """Dictionary containing critical values specific to the test, number of
        observations and included deterministic trend terms.
        """
        self._compute_if_needed()
        return self._critical_values

    def summary(self) -> Summary:
        """Summary of test, containing statistic, p-value and critical values"""
        table_data = [
            ("Test Statistic", f"{self.stat:0.3f}"),
            ("P-value", f"{self.pvalue:0.3f}"),
            ("Lags", f"{self.lags:d}"),
        ]
        title = self._title

        if not title:
            title = self._test_name + " Results"
        table = SimpleTable(
            table_data,
            stubs=None,
            title=title,
            colwidths=18,
            datatypes=[0, 1],
            data_aligns=("l", "r"),
        )

        smry = Summary()
        smry.tables.append(table)

        cv_string = "Critical Values: "
        cv = self._critical_values.keys()
        cv_numeric = array([float(x.split("%")[0]) for x in cv])
        cv_numeric = sort(cv_numeric)
        for val in cv_numeric:
            p = str(int(val)) + "%"
            cv_string += f"{self._critical_values[p]:0.2f}"
            cv_string += " (" + p + ")"
            if val != cv_numeric[-1]:
                cv_string += ", "

        extra_text = [
            "Trend: " + TREND_DESCRIPTION[self._trend],
            cv_string,
            "Null Hypothesis: " + self.null_hypothesis,
            "Alternative Hypothesis: " + self.alternative_hypothesis,
        ]

        smry.add_extra_txt(extra_text)
        if self._summary_text:
            smry.add_extra_txt(self._summary_text)
        return smry

    @property
    def lags(self) -> int:
        """Sets or gets the number of lags used in the model.
        When bootstrap use DF-type regressions, lags is the number of lags in the
        regression model.  When bootstrap use long-run variance estimators, lags
        is the number of lags used in the long-run variance estimator.
        """
        self._compute_if_needed()
        assert self._lags is not None
        return self._lags

    @property
    def y(self) -> ArrayLike:
        """Returns the data used in the test statistic"""
        return self._y

    @property
    def trend(self) -> str:
        """Sets or gets the deterministic trend term used in the test. See
        valid_trends for a list of supported trends
        """
        return self._trend


[docs] class ADF(UnitRootTest, metaclass=AbstractDocStringInheritor): """ Augmented Dickey-Fuller unit root test Parameters ---------- y : {ndarray, Series} The data to test for a unit root lags : int, optional The number of lags to use in the ADF regression. If omitted or None, `method` is used to automatically select the lag length with no more than `max_lags` are included. trend : {"n", "c", "ct", "ctt"}, optional The trend component to include in the test - "n" - No trend components - "c" - Include a constant (Default) - "ct" - Include a constant and linear time trend - "ctt" - Include a constant and linear and quadratic time trends max_lags : int, optional The maximum number of lags to use when selecting lag length method : {"AIC", "BIC", "t-stat"}, optional The method to use when selecting the lag length - "AIC" - Select the minimum of the Akaike IC - "BIC" - Select the minimum of the Schwarz/Bayesian IC - "t-stat" - Select the minimum of the Schwarz/Bayesian IC low_memory : bool Flag indicating whether to use a low memory implementation of the lag selection algorithm. The low memory algorithm is slower than the standard algorithm but will use 2-4% of the memory required for the standard algorithm. This options allows automatic lag selection to be used in very long time series. If None, use automatic selection of algorithm. Notes ----- The null hypothesis of the Augmented Dickey-Fuller is that there is a unit root, with the alternative that there is no unit root. If the pvalue is above a critical size, then the null cannot be rejected that there and the series appears to be a unit root. The p-values are obtained through regression surface approximation from MacKinnon (1994) using the updated 2010 tables. If the p-value is close to significant, then the critical values should be used to judge whether to reject the null. The autolag option and maxlag for it are described in Greene. Examples -------- >>> from arch.unitroot import ADF >>> import numpy as np >>> import statsmodels.api as sm >>> data = sm.datasets.macrodata.load().data >>> inflation = np.diff(np.log(data["cpi"])) >>> adf = ADF(inflation) >>> print("{0:0.4f}".format(adf.stat)) -3.0931 >>> print("{0:0.4f}".format(adf.pvalue)) 0.0271 >>> adf.lags 2 >>> adf.trend="ct" >>> print("{0:0.4f}".format(adf.stat)) -3.2111 >>> print("{0:0.4f}".format(adf.pvalue)) 0.0822 References ---------- .. [*] Greene, W. H. 2011. Econometric Analysis. Prentice Hall: Upper Saddle River, New Jersey. .. [*] Hamilton, J. D. 1994. Time Series Analysis. Princeton: Princeton University Press. .. [*] MacKinnon, J.G. 1994. "Approximate asymptotic distribution functions for unit-root and cointegration bootstrap. `Journal of Business and Economic Statistics` 12, 167-76. .. [*] MacKinnon, J.G. 2010. "Critical Values for Cointegration Tests." Queen's University, Dept of Economics, Working Papers. Available at https://ideas.repec.org/p/qed/wpaper/1227.html """ def __init__( self, y: ArrayLike, lags: int | None = None, trend: UnitRootTrend = "c", max_lags: int | None = None, method: Literal["aic", "bic", "t-stat"] = "aic", low_memory: bool | None = None, ) -> None: valid_trends = ("n", "c", "ct", "ctt") super().__init__(y, lags, trend, valid_trends) self._max_lags = max_lags self._method = method self._test_name = "Augmented Dickey-Fuller" self._regression = None self._low_memory = bool(low_memory) if low_memory is None: self._low_memory = True if self.y.shape[0] > 1e5 else False def _select_lag(self) -> None: ic_best, best_lag = _df_select_lags( self._y, cast(UnitRootTrend, self._trend), self._max_lags, self._method, low_memory=self._low_memory, ) self._ic_best = ic_best self._lags = best_lag def _check_specification(self) -> None: trend_order = len(self._trend) if self._trend not in ("n", "nc") else 0 lag_len = 0 if self._lags is None else self._lags required = 3 + trend_order + lag_len if self._y.shape[0] < required: raise InfeasibleTestException( f"A minimum of {required} observations are needed to run an ADF with " f"trend {self.trend} and the user-specified number of lags." ) def _compute_statistic(self) -> None: if self._lags is None: self._select_lag() assert self._lags is not None y, trend, lags = self._y, self._trend, self._lags resols = _estimate_df_regression(y, cast(UnitRootTrend, trend), lags) self._regression = resols (self._stat, *_) = (stat, *_) = resols.tvalues self._nobs = int(resols.nobs) self._pvalue = mackinnonp( stat, regression=cast(Literal["n", "c", "ct", "ctt"], trend), num_unit_roots=1, ) critical_values = mackinnoncrit( num_unit_roots=1, regression=cast(Literal["n", "c", "ct", "ctt"], trend), nobs=resols.nobs, ) self._critical_values = { "1%": critical_values[0], "5%": critical_values[1], "10%": critical_values[2], } @property def regression(self) -> RegressionResults: """Returns the OLS regression results from the ADF model estimated""" self._compute_if_needed() return self._regression @property def max_lags(self) -> int | None: """Sets or gets the maximum lags used when automatically selecting lag length""" return self._max_lags
[docs] class DFGLS(UnitRootTest, metaclass=AbstractDocStringInheritor): """ Elliott, Rothenberg and Stock's ([1]_) GLS detrended Dickey-Fuller Parameters ---------- y : {ndarray, Series} The data to test for a unit root lags : int, optional The number of lags to use in the ADF regression. If omitted or None, `method` is used to automatically select the lag length with no more than `max_lags` are included. trend : {"c", "ct"}, optional The trend component to include in the test - "c" - Include a constant (Default) - "ct" - Include a constant and linear time trend max_lags : int, optional The maximum number of lags to use when selecting lag length. When using automatic lag length selection, the lag is selected using OLS detrending rather than GLS detrending ([2]_). method : {"AIC", "BIC", "t-stat"}, optional The method to use when selecting the lag length - "AIC" - Select the minimum of the Akaike IC - "BIC" - Select the minimum of the Schwarz/Bayesian IC - "t-stat" - Select the minimum of the Schwarz/Bayesian IC Notes ----- The null hypothesis of the Dickey-Fuller GLS is that there is a unit root, with the alternative that there is no unit root. If the pvalue is above a critical size, then the null cannot be rejected and the series appears to be a unit root. DFGLS differs from the ADF test in that an initial GLS detrending step is used before a trend-less ADF regression is run. Critical values and p-values when trend is "c" are identical to the ADF. When trend is set to "ct", they are from ... Examples -------- >>> from arch.unitroot import DFGLS >>> import numpy as np >>> import statsmodels.api as sm >>> data = sm.datasets.macrodata.load().data >>> inflation = np.diff(np.log(data["cpi"])) >>> dfgls = DFGLS(inflation) >>> print("{0:0.4f}".format(dfgls.stat)) -2.7611 >>> print("{0:0.4f}".format(dfgls.pvalue)) 0.0059 >>> dfgls.lags 2 >>> dfgls.trend = "ct" >>> print("{0:0.4f}".format(dfgls.stat)) -2.9036 >>> print("{0:0.4f}".format(dfgls.pvalue)) 0.0447 References ---------- .. [1] Elliott, G. R., T. J. Rothenberg, and J. H. Stock. 1996. Efficient bootstrap for an autoregressive unit root. Econometrica 64: 813-836 .. [2] Perron, P., & Qu, Z. (2007). A simple modification to improve the finite sample properties of Ng and Perron's unit root tests. Economics letters, 94(1), 12-19. """ def __init__( self, y: ArrayLike, lags: int | None = None, trend: Literal["c", "ct"] = "c", max_lags: int | None = None, method: Literal["aic", "bic", "t-stat"] = "aic", low_memory: bool | None = None, ) -> None: valid_trends = ("c", "ct") super().__init__(y, lags, trend, valid_trends) self._max_lags = max_lags self._method = method self._regression = None self._low_memory = low_memory if low_memory is None: self._low_memory = True if self.y.shape[0] >= 1e5 else False self._test_name = "Dickey-Fuller GLS" if trend == "c": self._c = -7.0 else: self._c = -13.5 def _check_specification(self) -> None: trend_order = len(self._trend) lag_len = 0 if self._lags is None else self._lags required = 3 + trend_order + lag_len if self._y.shape[0] < required: raise InfeasibleTestException( f"A minimum of {required} observations are needed to run an ADF with " f"trend {self.trend} and the user-specified number of lags." ) def _compute_statistic(self) -> None: """Core routine to estimate DF-GLS test statistic""" # 1. GLS detrend trend, c = self._trend, self._c nobs = self._y.shape[0] ct = c / nobs z = add_trend(nobs=nobs, trend=trend) delta_z = z.copy() delta_z[1:, :] = delta_z[1:, :] - (1 + ct) * delta_z[:-1, :] delta_y = self._y.copy()[:, None] delta_y[1:] = delta_y[1:] - (1 + ct) * delta_y[:-1] detrend_coef = pinv(delta_z) @ delta_y y = self._y y_detrended = y - (z @ detrend_coef).ravel() # 2. determine lag length, if needed if self._lags is None: max_lags, method = self._max_lags, self._method assert self._low_memory is not None self._lags = ADF(self._y, method=method, max_lags=max_lags).lags ols_detrend_coef = lstsq(z, y, rcond=None)[0] y_ols_detrend = y - z @ ols_detrend_coef icbest, bestlag = _df_select_lags( y_ols_detrend, "n", max_lags, method, low_memory=self._low_memory ) self._lags = bestlag # 3. Run Regression lags = self._lags resols = _estimate_df_regression(y_detrended, lags=lags, trend="n") self._regression = resols self._nobs = int(resols.nobs) self._stat, *_ = resols.tvalues assert self._stat is not None self._pvalue = mackinnonp( self._stat, regression=cast(Literal["c", "ct"], trend), dist_type="dfgls" ) critical_values = mackinnoncrit( regression=cast(Literal["c", "ct"], trend), nobs=self._nobs, dist_type="dfgls", ) self._critical_values = { "1%": critical_values[0], "5%": critical_values[1], "10%": critical_values[2], } @property def trend(self) -> str: return self._trend @property def regression(self) -> RegressionResults: """Returns the OLS regression results from the ADF model estimated""" self._compute_if_needed() return self._regression @property def max_lags(self) -> int | None: """Sets or gets the maximum lags used when automatically selecting lag length""" return self._max_lags
[docs] class PhillipsPerron(UnitRootTest, metaclass=AbstractDocStringInheritor): """ Phillips-Perron unit root test Parameters ---------- y : {ndarray, Series} The data to test for a unit root lags : int, optional The number of lags to use in the Newey-West estimator of the long-run covariance. If omitted or None, the lag length is set automatically to 12 * (nobs/100) ** (1/4) trend : {"n", "c", "ct"}, optional The trend component to include in the test - "n" - No trend components - "c" - Include a constant (Default) - "ct" - Include a constant and linear time trend test_type : {"tau", "rho"} The test to use when computing the test statistic. "tau" is based on the t-stat and "rho" uses a test based on nobs times the re-centered regression coefficient Notes ----- The null hypothesis of the Phillips-Perron (PP) test is that there is a unit root, with the alternative that there is no unit root. If the pvalue is above a critical size, then the null cannot be rejected that there and the series appears to be a unit root. Unlike the ADF test, the regression estimated includes only one lag of the dependant variable, in addition to trend terms. Any serial correlation in the regression errors is accounted for using a long-run variance estimator (currently Newey-West). The p-values are obtained through regression surface approximation from MacKinnon (1994) using the updated 2010 tables. If the p-value is close to significant, then the critical values should be used to judge whether to reject the null. Examples -------- >>> from arch.unitroot import PhillipsPerron >>> import numpy as np >>> import statsmodels.api as sm >>> data = sm.datasets.macrodata.load().data >>> inflation = np.diff(np.log(data["cpi"])) >>> pp = PhillipsPerron(inflation) >>> print("{0:0.4f}".format(pp.stat)) -8.1356 >>> print("{0:0.4f}".format(pp.pvalue)) 0.0000 >>> pp.lags 15 >>> pp.trend = "ct" >>> print("{0:0.4f}".format(pp.stat)) -8.2022 >>> print("{0:0.4f}".format(pp.pvalue)) 0.0000 >>> pp.test_type = "rho" >>> print("{0:0.4f}".format(pp.stat)) -120.3271 >>> print("{0:0.4f}".format(pp.pvalue)) 0.0000 References ---------- .. [*] Hamilton, J. D. 1994. Time Series Analysis. Princeton: Princeton University Press. .. [*] Newey, W. K., and K. D. West. 1987. "A simple, positive semidefinite, heteroskedasticity and autocorrelation consistent covariance matrix". Econometrica 55, 703-708. .. [*] Phillips, P. C. B., and P. Perron. 1988. "Testing for a unit root in time series regression". Biometrika 75, 335-346. .. [*] MacKinnon, J.G. 1994. "Approximate asymptotic distribution functions for unit-root and cointegration bootstrap". Journal of Business and Economic Statistics. 12, 167-76. .. [*] MacKinnon, J.G. 2010. "Critical Values for Cointegration Tests." Queen's University, Dept of Economics, Working Papers. Available at https://ideas.repec.org/p/qed/wpaper/1227.html """ def __init__( self, y: ArrayLike, lags: int | None = None, trend: Literal["n", "c", "ct"] = "c", test_type: Literal["tau", "rho"] = "tau", ) -> None: valid_trends = ("n", "c", "ct") super().__init__(y, lags, trend, valid_trends) self._test_type = test_type self._stat_rho = None self._stat_tau = None self._test_name = "Phillips-Perron Test" self._lags = lags self._regression = None def _check_specification(self) -> None: trend_order = len(self._trend) if self._trend not in ("n", "nc") else 0 lag_len = 0 if self._lags is None else self._lags required = max(3 + trend_order, lag_len) if self._y.shape[0] < required: raise InfeasibleTestException( f"A minimum of {required} observations are needed to run an ADF with " f"trend {self.trend} and the user-specified number of lags." ) def _compute_statistic(self) -> None: """Core routine to estimate PP test statistics""" # 1. Estimate Regression y, trend = self._y, self._trend nobs = y.shape[0] if self._lags is None: self._lags = int(ceil(12.0 * power(nobs / 100.0, 1 / 4.0))) lags = self._lags rhs = asarray(y, dtype=float)[:-1, None] rhs_df = _add_column_names(rhs, 0) lhs = y[1:, None] if trend != "n": rhs_df = add_trend(rhs_df, trend) mod = OLS(lhs, rhs_df) resols = mod.fit() self._regression = mod.fit(cov_type="HAC", cov_kwds={"maxlags": lags}) k = rhs_df.shape[1] n, u = resols.nobs, resols.resid if u.shape[0] < lags: raise InfeasibleTestException( f"The number of observations {u.shape[0]} is less than the number of" f"lags in the long-run covariance estimator, {lags}. You must have " "lags <= nobs." ) lam2 = cov_nw(u, lags, demean=False) lam = sqrt(lam2) # 2. Compute components s2 = u @ u / (n - k) s = sqrt(s2) gamma0 = s2 * (n - k) / n sigma, *_ = resols.bse sigma2 = sigma**2.0 if sigma <= 0: raise InfeasibleTestException( "The estimated variance of the coefficient in the Phillips-Perron " "regression is 0. This may occur if the series contains constant " "values or the residual variance in the regression is 0." ) rho, *_ = resols.params # 3. Compute statistics self._stat_tau = sqrt(gamma0 / lam2) * ((rho - 1) / sigma) - 0.5 * ( (lam2 - gamma0) / lam ) * (n * sigma / s) self._stat_rho = n * (rho - 1) - 0.5 * (n**2.0 * sigma2 / s2) * (lam2 - gamma0) self._nobs = int(resols.nobs) if self._test_type == "rho": self._stat = self._stat_rho dist_type = "adf-z" else: self._stat = self._stat_tau dist_type = "adf-t" assert self._stat is not None self._pvalue = mackinnonp(self._stat, regression=trend, dist_type=dist_type) critical_values = mackinnoncrit(regression=trend, nobs=n, dist_type=dist_type) self._critical_values = { "1%": critical_values[0], "5%": critical_values[1], "10%": critical_values[2], } self._title = self._test_name + " (Z-" + self._test_type + ")" @property def test_type(self) -> str: """ Gets or sets the test type returned by stat. Valid values are "tau" or "rho" """ return self._test_type @property def regression(self) -> RegressionResults: """ Returns OLS regression results for the specification used in the test The results returned use a Newey-West covariance matrix with the same number of lags as are used in the test statistic. """ self._compute_if_needed() return self._regression
[docs] class KPSS(UnitRootTest, metaclass=AbstractDocStringInheritor): """ Kwiatkowski, Phillips, Schmidt and Shin (KPSS) stationarity test Parameters ---------- y : {ndarray, Series} The data to test for stationarity lags : int, optional The number of lags to use in the Newey-West estimator of the long-run covariance. If omitted or None, the number of lags is calculated with the data-dependent method of Hobijn et al. (1998). See also Andrews (1991), Newey & West (1994), and Schwert (1989). Set lags=-1 to use the old method that only depends on the sample size, 12 * (nobs/100) ** (1/4). trend : {"c", "ct"}, optional The trend component to include in the ADF test "c" - Include a constant (Default) "ct" - Include a constant and linear time trend Notes ----- The null hypothesis of the KPSS test is that the series is weakly stationary and the alternative is that it is non-stationary. If the p-value is above a critical size, then the null cannot be rejected that there and the series appears stationary. The p-values and critical values were computed using an extensive simulation based on 100,000,000 replications using series with 2,000 observations. Examples -------- >>> from arch.unitroot import KPSS >>> import numpy as np >>> import statsmodels.api as sm >>> data = sm.datasets.macrodata.load().data >>> inflation = np.diff(np.log(data["cpi"])) >>> kpss = KPSS(inflation) >>> print("{0:0.4f}".format(kpss.stat)) 0.2870 >>> print("{0:0.4f}".format(kpss.pvalue)) 0.1473 >>> kpss.trend = "ct" >>> print("{0:0.4f}".format(kpss.stat)) 0.2075 >>> print("{0:0.4f}".format(kpss.pvalue)) 0.0128 References ---------- .. [*] Andrews, D.W.K. (1991). "Heteroskedasticity and autocorrelation consistent covariance matrix estimation". Econometrica, 59: 817-858. .. [*] Hobijn, B., Frances, B.H., & Ooms, M. (2004). Generalizations of the KPSS-test for stationarity. Statistica Neerlandica, 52: 483-502. .. [*] Kwiatkowski, D.; Phillips, P. C. B.; Schmidt, P.; Shin, Y. (1992). "Testing the null hypothesis of stationarity against the alternative of a unit root". Journal of Econometrics 54 (1-3), 159-178 .. [*] Newey, W.K., & West, K.D. (1994). "Automatic lag selection in covariance matrix estimation". Review of Economic Studies, 61: 631-653. .. [*] Schwert, G. W. (1989). "Tests for unit roots: A Monte Carlo investigation". Journal of Business and Economic Statistics, 7 (2): 147-159. """ def __init__( self, y: ArrayLike, lags: int | None = None, trend: Literal["c", "ct"] = "c" ) -> None: valid_trends = ("c", "ct") if lags is None: warnings.warn( "Lag selection has changed to use a data-dependent method. To use the " "old method that only depends on time, set lags=-1", DeprecationWarning, ) self._legacy_lag_selection = False if lags == -1: self._legacy_lag_selection = True lags = None super().__init__(y, lags, trend, valid_trends) self._test_name = "KPSS Stationarity Test" self._null_hypothesis = "The process is weakly stationary." self._alternative_hypothesis = "The process contains a unit root." self._resids: ArrayLike1D | None = None def _check_specification(self) -> None: trend_order = len(self._trend) lag_len = 0 if self._lags is None else self._lags required = max(1 + trend_order, lag_len) if self._y.shape[0] < required: raise InfeasibleTestException( f"A minimum of {required} observations are needed to run an ADF with " f"trend {self.trend} and the user-specified number of lags." ) def _compute_statistic(self) -> None: # 1. Estimate model with trend nobs, y, trend = self._nobs, self._y, self._trend z = add_trend(nobs=nobs, trend=trend) res = OLS(y, z).fit() # 2. Compute KPSS test self._resids = u = res.resid if self._lags is None: if self._legacy_lag_selection: self._lags = int(ceil(12.0 * power(nobs / 100.0, 1 / 4.0))) else: self._autolag() assert self._lags is not None if u.shape[0] < self._lags: raise InfeasibleTestException( f"The number of observations {u.shape[0]} is less than the number of" f"lags in the long-run covariance estimator, {self._lags}. You must " "have lags <= nobs." ) lam = cov_nw(u, self._lags, demean=False) s = cumsum(u) self._stat = 1 / (nobs**2.0) * (s**2.0).sum() / lam self._nobs = u.shape[0] assert self._stat is not None if trend == "c": lit_trend: Literal["c", "ct"] = "c" else: lit_trend = "ct" self._pvalue, critical_values = kpss_crit(self._stat, lit_trend) self._critical_values = { "1%": critical_values[0], "5%": critical_values[1], "10%": critical_values[2], } def _autolag(self) -> None: """ Computes the number of lags for covariance matrix estimation in KPSS test using method of Hobijn et al (1998). See also Andrews (1991), Newey & West (1994), and Schwert (1989). Assumes Bartlett / Newey-West kernel. Written by Jim Varanelli """ resids = self._resids assert resids is not None covlags = int(power(self._nobs, 2.0 / 9.0)) s0 = sum(resids**2) / self._nobs s1 = 0 for i in range(1, covlags + 1): resids_prod = resids[i:] @ resids[: self._nobs - i] resids_prod /= self._nobs / 2 s0 += resids_prod s1 += i * resids_prod if s0 <= 0: raise InfeasibleTestException( "Residuals are all zero and so automatic bandwidth selection cannot " "be used. This is usually an indication that the series being testes " "is too small or have constant values." ) s_hat = s1 / s0 pwr = 1.0 / 3.0 gamma_hat = 1.1447 * power(s_hat * s_hat, pwr) autolags = amin([self._nobs, int(gamma_hat * power(self._nobs, pwr))]) self._lags = int(autolags)
[docs] class ZivotAndrews(UnitRootTest, metaclass=AbstractDocStringInheritor): """ Zivot-Andrews structural-break unit-root test The Zivot-Andrews test can be used to test for a unit root in a univariate process in the presence of serial correlation and a single structural break. Parameters ---------- y : array_like data series lags : int, optional The number of lags to use in the ADF regression. If omitted or None, `method` is used to automatically select the lag length with no more than `max_lags` are included. trend : {"c", "t", "ct"}, optional The trend component to include in the test - "c" - Include a constant (Default) - "t" - Include a linear time trend - "ct" - Include a constant and linear time trend trim : float percentage of series at begin/end to exclude from break-period calculation in range [0, 0.333] (default=0.15) max_lags : int, optional The maximum number of lags to use when selecting lag length method : {"AIC", "BIC", "t-stat"}, optional The method to use when selecting the lag length - "AIC" - Select the minimum of the Akaike IC - "BIC" - Select the minimum of the Schwarz/Bayesian IC - "t-stat" - Select the minimum of the Schwarz/Bayesian IC Notes ----- H0 = unit root with a single structural break Algorithm follows Baum (2004/2015) approximation to original Zivot-Andrews method. Rather than performing an autolag regression at each candidate break period (as per the original paper), a single autolag regression is run up-front on the base model (constant + trend with no dummies) to determine the best lag length. This lag length is then used for all subsequent break-period regressions. This results in significant run time reduction but also slightly more pessimistic test statistics than the original Zivot-Andrews method, No attempt has been made to characterize the size/power trade-off. References ---------- .. [*] Baum, C.F. (2004). ZANDREWS: Stata module to calculate Zivot-Andrews unit root test in presence of structural break," Statistical Software Components S437301, Boston College Department of Economics, revised 2015. .. [*] Schwert, G.W. (1989). Tests for unit roots: A Monte Carlo investigation. Journal of Business & Economic Statistics, 7: 147-159. .. [*] Zivot, E., and Andrews, D.W.K. (1992). Further evidence on the great crash, the oil-price shock, and the unit-root hypothesis. Journal of Business & Economic Studies, 10: 251-270. """ def __init__( self, y: ArrayLike, lags: int | None = None, trend: Literal["c", "ct", "t"] = "c", trim: float = 0.15, max_lags: int | None = None, method: Literal["aic", "bic", "t-stat"] = "aic", ) -> None: super().__init__(y, lags, trend, ("c", "t", "ct")) if not isinstance(trim, float) or not 0 <= trim <= (1 / 3): raise ValueError("trim must be a float in range [0, 1/3]") self._trim = trim self._max_lags = max_lags self._method = method self._test_name = "Zivot-Andrews" self._all_stats = full(self._y.shape[0], nan) self._null_hypothesis = ( "The process contains a unit root with a single structural break." ) self._alternative_hypothesis = "The process is trend and break stationary." @staticmethod def _quick_ols(endog: Float64Array, exog: Float64Array) -> Float64Array: """ Minimal implementation of LS estimator for internal use """ xpxi = inv(exog.T @ exog) xpy = exog.T @ endog nobs, k_exog = exog.shape b = xpxi @ xpy e = endog - exog @ b sigma2 = e.T @ e / (nobs - k_exog) return b / sqrt(diag(sigma2 * xpxi)) def _check_specification(self) -> None: trend_order = len(self._trend) lag_len = 0 if self._lags is None else self._lags required = 3 + trend_order + lag_len if self._y.shape[0] < required: raise InfeasibleTestException( f"A minimum of {required} observations are needed to run an ADF with " f"trend {self.trend} and the user-specified number of lags." ) def _compute_statistic(self) -> None: """This is the core routine that computes the test statistic, computes the p-value and constructs the critical values. """ trim = self._trim trend = self._trend y = self._y y_2d = ensure2d(y, "y") nobs = y_2d.shape[0] if self._lags is not None: baselags = self._lags else: adf = ADF(self._y, max_lags=self._max_lags, trend="ct", method=self._method) self._lags = baselags = adf.lags trimcnt = int(nobs * trim) start_period = trimcnt end_period = nobs - trimcnt if trend == "ct": basecols = 5 else: basecols = 4 # first-diff y and standardize for numerical stability dy = diff(y_2d, axis=0)[:, 0] dy /= sqrt(dy.T @ dy) y_2d = y_2d / sqrt(y_2d.T @ y_2d) # reserve exog space exog = empty((dy[baselags:].shape[0], basecols + baselags)) # normalize constant for stability in long time series c_const = 1 / sqrt(nobs) # Normalize exog[:, 0] = c_const # lagged y and dy exog[:, basecols - 1] = y_2d[baselags : (nobs - 1), 0] exog[:, basecols:] = lagmat(dy, baselags, trim="none")[ baselags : exog.shape[0] + baselags ] # better time trend: t_const @ t_const = 1 for large nobs t_const = arange(1.0, nobs + 2) t_const *= sqrt(3) / nobs ** (3 / 2) # iterate through the time periods stats = full(end_period + 1, inf) for bp in range(start_period + 1, end_period + 1): # update intercept dummy / trend / trend dummy cutoff = bp - (baselags + 1) if cutoff <= 0: raise InfeasibleTestException( f"The number of observations is too small to use the Zivot-Andrews " f"test with trend {trend} and {self._lags} lags." ) if trend != "t": exog[:cutoff, 1] = 0 exog[cutoff:, 1] = c_const exog[:, 2] = t_const[(baselags + 2) : (nobs + 1)] if trend == "ct": exog[:cutoff, 3] = 0 exog[cutoff:, 3] = t_const[1 : (nobs - bp + 1)] else: exog[:, 1] = t_const[(baselags + 2) : (nobs + 1)] exog[: (cutoff - 1), 2] = 0 exog[(cutoff - 1) :, 2] = t_const[0 : (nobs - bp + 1)] # check exog rank on first iteration if bp == start_period + 1: rank = matrix_rank(exog) if rank < exog.shape[1]: raise InfeasibleTestException( f"The regressor matrix is singular. The can happen if the data " "contains regions of constant observations, if the number of " f"lags ({self._lags}) is too large, or if the series is very " "short." ) stats[bp] = self._quick_ols(dy[baselags:], exog)[basecols - 1] # return best seen self._all_stats[start_period + 1 : end_period + 1] = stats[ start_period + 1 : end_period + 1 ] self._stat = float(amin(stats)) self._cv_interpolate() def _cv_interpolate(self) -> None: """ Linear interpolation for Zivot-Andrews p-values and critical values Notes ----- The p-values are linearly interpolated from the quantiles of the simulated ZA test statistic distribution """ table = za_critical_values[self._trend] y = table[:, 0] x = table[:, 1] # ZA cv table contains quantiles multiplied by 100 self._pvalue = float(interp(self.stat, x, y)) / 100.0 cv = [1.0, 5.0, 10.0] crit_value = interp(cv, y, x) self._critical_values = { "1%": crit_value[0], "5%": crit_value[1], "10%": crit_value[2], }
[docs] class VarianceRatio(UnitRootTest, metaclass=AbstractDocStringInheritor): """ Variance Ratio test of a random walk. Parameters ---------- y : {ndarray, Series} The data to test for a random walk lags : int The number of periods to used in the multi-period variance, which is the numerator of the test statistic. Must be at least 2 trend : {"n", "c"}, optional "c" allows for a non-zero drift in the random walk, while "n" requires that the increments to y are mean 0 overlap : bool, optional Indicates whether to use all overlapping blocks. Default is True. If False, the number of observations in y minus 1 must be an exact multiple of lags. If this condition is not satisfied, some values at the end of y will be discarded. robust : bool, optional Indicates whether to use heteroskedasticity robust inference. Default is True. debiased : bool, optional Indicates whether to use a debiased version of the test. Default is True. Only applicable if overlap is True. Notes ----- The null hypothesis of a VR is that the process is a random walk, possibly plus drift. Rejection of the null with a positive test statistic indicates the presence of positive serial correlation in the time series. Examples -------- >>> from arch.unitroot import VarianceRatio >>> import pandas_datareader as pdr >>> data = pdr.get_data_fred("DJIA", start="2010-1-1", end="2020-12-31") >>> data = np.log(data.resample("M").last()) # End of month >>> vr = VarianceRatio(data, lags=12) >>> print("{0:0.4f}".format(vr.pvalue)) 0.1370 References ---------- .. [*] Campbell, John Y., Lo, Andrew W. and MacKinlay, A. Craig. (1997) The Econometrics of Financial Markets. Princeton, NJ: Princeton University Press. """ def __init__( self, y: ArrayLike, lags: int = 2, trend: Literal["n", "c"] = "c", debiased: bool = True, robust: bool = True, overlap: bool = True, ) -> None: if lags < 2: raise ValueError("lags must be an integer larger than 2") valid_trends = ("n", "c") super().__init__(y, lags, trend, valid_trends) self._test_name = "Variance-Ratio Test" self._null_hypothesis = "The process is a random walk." self._alternative_hypothesis = "The process is not a random walk." self._robust = robust self._debiased = debiased self._overlap = overlap self._vr: float | None = None self._stat_variance: float | None = None quantiles = array([0.01, 0.05, 0.1, 0.9, 0.95, 0.99]) for q, cv in zip(quantiles, norm.ppf(quantiles)): self._critical_values[str(int(100 * q)) + "%"] = cv @property def vr(self) -> float: """The ratio of the long block lags-period variance to the 1-period variance""" self._compute_if_needed() assert self._vr is not None return self._vr @property def overlap(self) -> bool: """Sets of gets the indicator to use overlapping returns in the long-period variance estimator""" return self._overlap @property def robust(self) -> bool: """Sets of gets the indicator to use a heteroskedasticity robust variance estimator""" return self._robust @property def debiased(self) -> bool: """Sets of gets the indicator to use debiased variances in the ratio""" return self._debiased def _check_specification(self) -> None: assert self._lags is not None lags = self._lags required = 2 * lags if not self._overlap else lags + 1 + int(self.debiased) if self._y.shape[0] < required: raise InfeasibleTestException( f"A minimum of {required} observations are needed to run an ADF with " f"trend {self.trend} and the user-specified number of lags." ) def _compute_statistic(self) -> None: overlap, debiased, robust = self._overlap, self._debiased, self._robust y, nobs, q, trend = self._y, self._nobs, self._lags, self._trend assert q is not None nq = nobs - 1 if not overlap: # Check length of y if nq % q != 0: extra = nq % q y = y[:-extra] warnings.warn( invalid_length_doc.format(var="y", block=q, drop=extra), InvalidLengthWarning, ) nobs = y.shape[0] if trend == "n": mu = 0 else: mu = (y[-1] - y[0]) / (nobs - 1) delta_y = diff(y) nq = delta_y.shape[0] sigma2_1 = sum((delta_y - mu) ** 2.0) / nq if not overlap: delta_y_q = y[q::q] - y[0:-q:q] sigma2_q = sum((delta_y_q - q * mu) ** 2.0) / nq self._summary_text = ["Computed with non-overlapping blocks"] else: delta_y_q = y[q:] - y[:-q] sigma2_q = sum((delta_y_q - q * mu) ** 2.0) / (nq * q) self._summary_text = ["Computed with overlapping blocks"] if debiased and overlap: sigma2_1 *= nq / (nq - 1) m = q * (nq - q + 1) * (1 - (q / nq)) sigma2_q *= (nq * q) / m self._summary_text = ["Computed with overlapping blocks (de-biased)"] if not overlap: self._stat_variance = 2.0 * (q - 1) elif not robust: # GH 286, CLM 2.4.39 self._stat_variance = (2 * (2 * q - 1) * (q - 1)) / (3 * q) else: z2 = (delta_y - mu) ** 2.0 scale = sum(z2) ** 2.0 theta = 0.0 for k in range(1, q): delta = nq * z2[k:] @ z2[:-k] / scale # GH 286, CLM 2.4.43 theta += 4 * (1 - k / q) ** 2.0 * delta self._stat_variance = theta self._vr = sigma2_q / sigma2_1 assert self._vr is not None self._stat = sqrt(nq) * (self._vr - 1) / sqrt(self._stat_variance) assert self._stat is not None abs_stat = float(abs(self._stat)) self._pvalue = 2 - 2 * norm.cdf(abs(abs_stat))
def mackinnonp( stat: float, regression: Literal["c", "n", "ct", "ctt"] = "c", num_unit_roots: int = 1, dist_type: Literal["adf-t", "adf-z", "dfgls"] = "adf-t", ) -> float: """ Returns MacKinnon's approximate p-value for test stat. Parameters ---------- stat : float "T-value" from an Augmented Dickey-Fuller or DFGLS regression. regression : {"c", "n", "ct", "ctt"} This is the method of regression that was used. Following MacKinnon's notation, this can be "c" for constant, "n" for no constant, "ct" for constant and trend, and "ctt" for constant, trend, and trend-squared. num_unit_roots : int The number of series believed to be I(1). For (Augmented) Dickey- Fuller N = 1. dist_type : {"adf-t", "adf-z", "dfgls"} The test type to use when computing p-values. Options include "ADF-t" - ADF t-stat based bootstrap "ADF-z" - ADF z bootstrap "DFGLS" - GLS detrended Dickey Fuller Returns ------- p-value : float The p-value for the ADF statistic estimated using MacKinnon 1994. References ---------- MacKinnon, J.G. 1994 "Approximate Asymptotic Distribution Functions for Unit-Root and Cointegration Tests." Journal of Business & Economics Statistics, 12.2, 167-76. Notes ----- Most values are from MacKinnon (1994). Values for DFGLS test statistics and the "n" version of the ADF z test statistic were computed following the methodology of MacKinnon (1994). """ dist_type = cast(Literal["adf-t", "adf-z", "dfgls"], dist_type.lower()) if num_unit_roots > 1 and dist_type.lower() != "adf-t": raise ValueError( "Cointegration results (num_unit_roots > 1) are" + "only available for ADF-t values" ) if dist_type == "adf-t": maxstat = tau_max[regression][num_unit_roots - 1] minstat = tau_min[regression][num_unit_roots - 1] starstat = tau_star[regression][num_unit_roots - 1] small_p = tau_small_p[regression][num_unit_roots - 1] large_p = tau_large_p[regression][num_unit_roots - 1] elif dist_type == "adf-z": maxstat = adf_z_max[regression] minstat = adf_z_min[regression] starstat = adf_z_star[regression] small_p = adf_z_small_p[regression] large_p = adf_z_large_p[regression] elif dist_type == "dfgls": maxstat = dfgls_tau_max[regression] minstat = dfgls_tau_min[regression] starstat = dfgls_tau_star[regression] small_p = dfgls_small_p[regression] large_p = dfgls_large_p[regression] else: raise ValueError(f"Unknown test type {dist_type}") if stat > maxstat: return 1.0 elif stat < minstat: return 0.0 if stat <= starstat: poly_coef = small_p if dist_type == "adf-z": stat = float(log(abs(stat))) # Transform stat for small p ADF-z else: poly_coef = large_p return norm.cdf(polyval(poly_coef[::-1], stat)) def mackinnoncrit( num_unit_roots: int = 1, regression: Literal["c", "n", "ct", "ctt"] = "c", nobs: float = inf, dist_type: Literal["adf-t", "adf-z", "dfgls"] = "adf-t", ) -> Float64Array: """ Returns the critical values for cointegrating and the ADF test. In 2010 MacKinnon updated the values of his 1994 paper with critical values for the augmented Dickey-Fuller bootstrap. These new values are to be preferred and are used here. Parameters ---------- num_unit_roots : int The number of series of I(1) series for which the null of non-cointegration is being tested. For N > 12, the critical values are linearly interpolated (not yet implemented). For the ADF test, N = 1. regression : {"c", "ct", "ctt", "n"}, optional Following MacKinnon (1996), these stand for the type of regression run. "c" for constant and no trend, "ct" for constant with a linear trend, "ctt" for constant with a linear and quadratic trend, and "n" for no constant. The values for the no constant case are taken from the 1996 paper, as they were not updated for 2010 due to the unrealistic assumptions that would underlie such a case. nobs : {int, np.inf}, optional This is the sample size. If the sample size is numpy.inf, then the asymptotic critical values are returned. dist_type : {"adf-t", "adf-z", "dfgls"}, optional Type of test statistic Returns ------- crit_vals : ndarray Three critical values corresponding to 1%, 5% and 10% cut-offs. Notes ----- Results for ADF t-stats from MacKinnon (1994,2010). Results for DFGLS and ADF z-bootstrap use the same methodology as MacKinnon. References ---------- MacKinnon, J.G. 1994 "Approximate Asymptotic Distribution Functions for Unit-Root and Cointegration Tests." Journal of Business & Economics Statistics, 12.2, 167-76. MacKinnon, J.G. 2010. "Critical Values for Cointegration Tests." Queen's University, Dept of Economics Working Papers 1227. https://ideas.repec.org/p/qed/wpaper/1227.html """ lower_dist_type = dist_type.lower() valid_regression = ["c", "ct", "n", "ctt"] if lower_dist_type == "dfgls": valid_regression = ["c", "ct"] if regression not in valid_regression: raise ValueError(f"regression keyword {regression} not understood") if lower_dist_type == "adf-t": asymptotic_cv = tau_2010[regression][num_unit_roots - 1, :, 0] poly_coef = tau_2010[regression][num_unit_roots - 1, :, :].T elif lower_dist_type == "adf-z": poly_coef = array(adf_z_cv_approx[regression]).T asymptotic_cv = array(adf_z_cv_approx[regression])[:, 0] elif lower_dist_type == "dfgls": poly_coef = dfgls_cv_approx[regression].T asymptotic_cv = dfgls_cv_approx[regression][:, 0] else: raise ValueError(f"Unknown test type {dist_type}") if nobs is inf: return asymptotic_cv else: # Flip so that highest power to lowest power return polyval(poly_coef[::-1], 1.0 / nobs) def kpss_crit( stat: float, trend: Literal["c", "ct"] = "c" ) -> tuple[float, Float64Array]: """ Linear interpolation for KPSS p-values and critical values Parameters ---------- stat : float The KPSS test statistic. trend : {"c","ct"} The trend used when computing the KPSS statistic Returns ------- pvalue : float The interpolated p-value crit_val : ndarray Three element array containing the 10%, 5% and 1% critical values, in order Notes ----- The p-values are linear interpolated from the quantiles of the simulated KPSS test statistic distribution using 100,000,000 replications and 2000 data points. """ table = kpss_critical_values[trend] y = table[:, 0] x = table[:, 1] # kpss.py contains quantiles multiplied by 100 pvalue = float(interp(stat, x, y)) / 100.0 cv = [1.0, 5.0, 10.0] crit_value = interp(cv, y[::-1], x[::-1]) return pvalue, crit_value
[docs] def auto_bandwidth( y: Sequence[float | int] | ArrayLike1D, kernel: Literal[ "ba", "bartlett", "nw", "pa", "parzen", "gallant", "qs", "andrews" ] = "ba", ) -> float: """ Automatic bandwidth selection of Andrews (1991) and Newey & West (1994). Parameters ---------- y : {ndarray, Series} Data on which to apply the bandwidth selection kernel : str The kernel function to use for selecting the bandwidth - "ba", "bartlett", "nw": Bartlett kernel (default) - "pa", "parzen", "gallant": Parzen kernel - "qs", "andrews": Quadratic Spectral kernel Returns ------- float The estimated optimal bandwidth. """ y = ensure1d(y, "y") if y.shape[0] < 2: raise ValueError("Data must contain more than one observation") lower_kernel = kernel.lower() if lower_kernel in ("ba", "bartlett", "nw"): kernel = "ba" n_power = 2 / 9 elif lower_kernel in ("pa", "parzen", "gallant"): kernel = "pa" n_power = 4 / 25 elif lower_kernel in ("qs", "andrews"): kernel = "qs" n_power = 2 / 25 else: raise ValueError("Unknown kernel") n = int(4 * ((len(y) / 100) ** n_power)) sig = (n + 1) * [0] for i in range(n + 1): a = list(y[i:]) b = list(y[: len(y) - i]) sig[i] = int(npsum([i * j for (i, j) in zip(a, b)])) sigma_m1 = sig[1 : len(sig)] # sigma without the 1st element s0 = sig[0] + 2 * sum(sigma_m1) if kernel == "ba": s1 = 0 for j in range(len(sigma_m1)): s1 += (j + 1) * sigma_m1[j] s1 *= 2 q = 1 t_power = 1 / (2 * q + 1) gamma = 1.1447 * (((s1 / s0) ** 2) ** t_power) else: s2 = 0 for j in range(len(sigma_m1)): s2 += ((j + 1) ** 2) * sigma_m1[j] s2 *= 2 q = 2 t_power = 1 / (2 * q + 1) if kernel == "pa": gamma = 2.6614 * (((s2 / s0) ** 2) ** t_power) else: # kernel == "qs": gamma = 1.3221 * (((s2 / s0) ** 2) ** t_power) bandwidth = gamma * power(len(y), t_power) return bandwidth