Source code for zipline.pipeline.factors.statistical

from numexpr import evaluate
import numpy as np
from numpy import broadcast_arrays
from scipy.stats import (
    linregress,
    spearmanr,
)

from zipline.assets import Asset
from zipline.errors import IncompatibleTerms
from zipline.pipeline.factors import CustomFactor
from zipline.pipeline.filters import SingleAsset
from zipline.pipeline.mixins import StandardOutputs
from zipline.pipeline.sentinels import NotSpecified
from zipline.pipeline.term import AssetExists
from zipline.utils.input_validation import (
    expect_bounded,
    expect_dtypes,
    expect_types,
)
from zipline.utils.math_utils import nanmean
from zipline.utils.numpy_utils import (
    float64_dtype,
    int64_dtype,
)


from .basic import Returns


ALLOWED_DTYPES = (float64_dtype, int64_dtype)


class _RollingCorrelation(CustomFactor):
    @expect_dtypes(base_factor=ALLOWED_DTYPES, target=ALLOWED_DTYPES)
    @expect_bounded(correlation_length=(2, None))
    def __new__(cls, base_factor, target, correlation_length, mask=NotSpecified):
        if target.ndim == 2 and base_factor.mask is not target.mask:
            raise IncompatibleTerms(term_1=base_factor, term_2=target)

        return super(_RollingCorrelation, cls).__new__(
            cls,
            inputs=[base_factor, target],
            window_length=correlation_length,
            mask=mask,
        )


[docs]class RollingPearson(_RollingCorrelation): """ A Factor that computes pearson correlation coefficients between the columns of a given Factor and either the columns of another Factor/BoundColumn or a slice/single column of data. Parameters ---------- base_factor : zipline.pipeline.Factor The factor for which to compute correlations of each of its columns with `target`. target : zipline.pipeline.Term with a numeric dtype The term with which to compute correlations against each column of data produced by `base_factor`. This term may be a Factor, a BoundColumn or a Slice. If `target` is two-dimensional, correlations are computed asset-wise. correlation_length : int Length of the lookback window over which to compute each correlation coefficient. mask : zipline.pipeline.Filter, optional A Filter describing which assets (columns) of `base_factor` should have their correlation with `target` computed each day. See Also -------- :func:`scipy.stats.pearsonr` :meth:`Factor.pearsonr` :class:`zipline.pipeline.factors.RollingPearsonOfReturns` Notes ----- Most users should call Factor.pearsonr rather than directly construct an instance of this class. """ window_safe = True
[docs] def compute(self, today, assets, out, base_data, target_data): vectorized_pearson_r( base_data, target_data, allowed_missing=0, out=out, )
[docs]class RollingSpearman(_RollingCorrelation): """ A Factor that computes spearman rank correlation coefficients between the columns of a given Factor and either the columns of another Factor/BoundColumn or a slice/single column of data. Parameters ---------- base_factor : zipline.pipeline.Factor The factor for which to compute correlations of each of its columns with `target`. target : zipline.pipeline.Term with a numeric dtype The term with which to compute correlations against each column of data produced by `base_factor`. This term may be a Factor, a BoundColumn or a Slice. If `target` is two-dimensional, correlations are computed asset-wise. correlation_length : int Length of the lookback window over which to compute each correlation coefficient. mask : zipline.pipeline.Filter, optional A Filter describing which assets (columns) of `base_factor` should have their correlation with `target` computed each day. See Also -------- :func:`scipy.stats.spearmanr` :meth:`Factor.spearmanr` :class:`zipline.pipeline.factors.RollingSpearmanOfReturns` Notes ----- Most users should call Factor.spearmanr rather than directly construct an instance of this class. """ window_safe = True
[docs] def compute(self, today, assets, out, base_data, target_data): # If `target_data` is a Slice or single column of data, broadcast it # out to the same shape as `base_data`, then compute column-wise. This # is efficient because each column of the broadcasted array only refers # to a single memory location. target_data = broadcast_arrays(target_data, base_data)[0] for i in range(len(out)): out[i] = spearmanr(base_data[:, i], target_data[:, i])[0]
class RollingLinearRegression(CustomFactor): """ A Factor that performs an ordinary least-squares regression predicting the columns of a given Factor from either the columns of another Factor/BoundColumn or a slice/single column of data. Parameters ---------- dependent : zipline.pipeline.Factor The factor whose columns are the predicted/dependent variable of each regression with `independent`. independent : zipline.pipeline.slice.Slice or zipline.pipeline.Factor The factor/slice whose columns are the predictor/independent variable of each regression with `dependent`. If `independent` is a Factor, regressions are computed asset-wise. regression_length : int Length of the lookback window over which to compute each regression. mask : zipline.pipeline.Filter, optional A Filter describing which assets (columns) of `dependent` should be regressed against `independent` each day. See Also -------- :func:`scipy.stats.linregress` :meth:`Factor.linear_regression` :class:`zipline.pipeline.factors.RollingLinearRegressionOfReturns` Notes ----- Most users should call Factor.linear_regression rather than directly construct an instance of this class. """ outputs = ["alpha", "beta", "r_value", "p_value", "stderr"] @expect_dtypes(dependent=ALLOWED_DTYPES, independent=ALLOWED_DTYPES) @expect_bounded(regression_length=(2, None)) def __new__(cls, dependent, independent, regression_length, mask=NotSpecified): if independent.ndim == 2 and dependent.mask is not independent.mask: raise IncompatibleTerms(term_1=dependent, term_2=independent) return super(RollingLinearRegression, cls).__new__( cls, inputs=[dependent, independent], window_length=regression_length, mask=mask, ) def compute(self, today, assets, out, dependent, independent): alpha = out.alpha beta = out.beta r_value = out.r_value p_value = out.p_value stderr = out.stderr def regress(y, x): regr_results = linregress(y=y, x=x) # `linregress` returns its results in the following order: # slope, intercept, r-value, p-value, stderr alpha[i] = regr_results[1] beta[i] = regr_results[0] r_value[i] = regr_results[2] p_value[i] = regr_results[3] stderr[i] = regr_results[4] # If `independent` is a Slice or single column of data, broadcast it # out to the same shape as `dependent`, then compute column-wise. This # is efficient because each column of the broadcasted array only refers # to a single memory location. independent = broadcast_arrays(independent, dependent)[0] for i in range(len(out)): regress(y=dependent[:, i], x=independent[:, i])
[docs]class RollingPearsonOfReturns(RollingPearson): """ Calculates the Pearson product-moment correlation coefficient of the returns of the given asset with the returns of all other assets. Pearson correlation is what most people mean when they say "correlation coefficient" or "R-value". Parameters ---------- target : zipline.assets.Asset The asset to correlate with all other assets. returns_length : int >= 2 Length of the lookback window over which to compute returns. Daily returns require a window length of 2. correlation_length : int >= 1 Length of the lookback window over which to compute each correlation coefficient. mask : zipline.pipeline.Filter, optional A Filter describing which assets should have their correlation with the target asset computed each day. Notes ----- Computing this factor over many assets can be time consuming. It is recommended that a mask be used in order to limit the number of assets over which correlations are computed. Examples -------- Let the following be example 10-day returns for three different assets:: SPY MSFT FB 2017-03-13 -.03 .03 .04 2017-03-14 -.02 -.03 .02 2017-03-15 -.01 .02 .01 2017-03-16 0 -.02 .01 2017-03-17 .01 .04 -.01 2017-03-20 .02 -.03 -.02 2017-03-21 .03 .01 -.02 2017-03-22 .04 -.02 -.02 Suppose we are interested in SPY's rolling returns correlation with each stock from 2017-03-17 to 2017-03-22, using a 5-day look back window (that is, we calculate each correlation coefficient over 5 days of data). We can achieve this by doing:: rolling_correlations = RollingPearsonOfReturns( target=sid(8554), returns_length=10, correlation_length=5, ) The result of computing ``rolling_correlations`` from 2017-03-17 to 2017-03-22 gives:: SPY MSFT FB 2017-03-17 1 .15 -.96 2017-03-20 1 .10 -.96 2017-03-21 1 -.16 -.94 2017-03-22 1 -.16 -.85 Note that the column for SPY is all 1's, as the correlation of any data series with itself is always 1. To understand how each of the other values were calculated, take for example the .15 in MSFT's column. This is the correlation coefficient between SPY's returns looking back from 2017-03-17 (-.03, -.02, -.01, 0, .01) and MSFT's returns (.03, -.03, .02, -.02, .04). See Also -------- :class:`zipline.pipeline.factors.RollingSpearmanOfReturns` :class:`zipline.pipeline.factors.RollingLinearRegressionOfReturns` """ def __new__(cls, target, returns_length, correlation_length, mask=NotSpecified): # Use the `SingleAsset` filter here because it protects against # inputting a non-existent target asset. returns = Returns( window_length=returns_length, mask=(AssetExists() | SingleAsset(asset=target)), ) return super(RollingPearsonOfReturns, cls).__new__( cls, base_factor=returns, target=returns[target], correlation_length=correlation_length, mask=mask, )
[docs]class RollingSpearmanOfReturns(RollingSpearman): """ Calculates the Spearman rank correlation coefficient of the returns of the given asset with the returns of all other assets. Parameters ---------- target : zipline.assets.Asset The asset to correlate with all other assets. returns_length : int >= 2 Length of the lookback window over which to compute returns. Daily returns require a window length of 2. correlation_length : int >= 1 Length of the lookback window over which to compute each correlation coefficient. mask : zipline.pipeline.Filter, optional A Filter describing which assets should have their correlation with the target asset computed each day. Notes ----- Computing this factor over many assets can be time consuming. It is recommended that a mask be used in order to limit the number of assets over which correlations are computed. See Also -------- :class:`zipline.pipeline.factors.RollingPearsonOfReturns` :class:`zipline.pipeline.factors.RollingLinearRegressionOfReturns` """ def __new__(cls, target, returns_length, correlation_length, mask=NotSpecified): # Use the `SingleAsset` filter here because it protects against # inputting a non-existent target asset. returns = Returns( window_length=returns_length, mask=(AssetExists() | SingleAsset(asset=target)), ) return super(RollingSpearmanOfReturns, cls).__new__( cls, base_factor=returns, target=returns[target], correlation_length=correlation_length, mask=mask, )
[docs]class RollingLinearRegressionOfReturns(RollingLinearRegression): """Perform an ordinary least-squares regression predicting the returns of all other assets on the given asset. Parameters ---------- target : zipline.assets.Asset The asset to regress against all other assets. returns_length : int >= 2 Length of the lookback window over which to compute returns. Daily returns require a window length of 2. regression_length : int >= 1 Length of the lookback window over which to compute each regression. mask : zipline.pipeline.Filter, optional A Filter describing which assets should be regressed against the target asset each day. Notes ----- Computing this factor over many assets can be time consuming. It is recommended that a mask be used in order to limit the number of assets over which regressions are computed. This factor is designed to return five outputs: - alpha, a factor that computes the intercepts of each regression. - beta, a factor that computes the slopes of each regression. - r_value, a factor that computes the correlation coefficient of each regression. - p_value, a factor that computes, for each regression, the two-sided p-value for a hypothesis test whose null hypothesis is that the slope is zero. - stderr, a factor that computes the standard error of the estimate of each regression. For more help on factors with multiple outputs, see :class:`zipline.pipeline.CustomFactor`. Examples -------- Let the following be example 10-day returns for three different assets:: SPY MSFT FB 2017-03-13 -.03 .03 .04 2017-03-14 -.02 -.03 .02 2017-03-15 -.01 .02 .01 2017-03-16 0 -.02 .01 2017-03-17 .01 .04 -.01 2017-03-20 .02 -.03 -.02 2017-03-21 .03 .01 -.02 2017-03-22 .04 -.02 -.02 Suppose we are interested in predicting each stock's returns from SPY's over rolling 5-day look back windows. We can compute rolling regression coefficients (alpha and beta) from 2017-03-17 to 2017-03-22 by doing:: regression_factor = RollingRegressionOfReturns( target=sid(8554), returns_length=10, regression_length=5, ) alpha = regression_factor.alpha beta = regression_factor.beta The result of computing ``alpha`` from 2017-03-17 to 2017-03-22 gives:: SPY MSFT FB 2017-03-17 0 .011 .003 2017-03-20 0 -.004 .004 2017-03-21 0 .007 .006 2017-03-22 0 .002 .008 And the result of computing ``beta`` from 2017-03-17 to 2017-03-22 gives:: SPY MSFT FB 2017-03-17 1 .3 -1.1 2017-03-20 1 .2 -1 2017-03-21 1 -.3 -1 2017-03-22 1 -.3 -.9 Note that SPY's column for alpha is all 0's and for beta is all 1's, as the regression line of SPY with itself is simply the function y = x. To understand how each of the other values were calculated, take for example MSFT's ``alpha`` and ``beta`` values on 2017-03-17 (.011 and .3, respectively). These values are the result of running a linear regression predicting MSFT's returns from SPY's returns, using values starting at 2017-03-17 and looking back 5 days. That is, the regression was run with x = [-.03, -.02, -.01, 0, .01] and y = [.03, -.03, .02, -.02, .04], and it produced a slope of .3 and an intercept of .011. See Also -------- :class:`zipline.pipeline.factors.RollingPearsonOfReturns` :class:`zipline.pipeline.factors.RollingSpearmanOfReturns` """ window_safe = True def __new__(cls, target, returns_length, regression_length, mask=NotSpecified): # Use the `SingleAsset` filter here because it protects against # inputting a non-existent target asset. returns = Returns( window_length=returns_length, mask=(AssetExists() | SingleAsset(asset=target)), ) return super(RollingLinearRegressionOfReturns, cls).__new__( cls, dependent=returns, independent=returns[target], regression_length=regression_length, mask=mask, )
[docs]class SimpleBeta(CustomFactor, StandardOutputs): """Factor producing the slope of a regression line between each asset's daily returns to the daily returns of a single "target" asset. Parameters ---------- target : zipline.Asset Asset against which other assets should be regressed. regression_length : int Number of days of daily returns to use for the regression. allowed_missing_percentage : float, optional Percentage of returns observations (between 0 and 1) that are allowed to be missing when calculating betas. Assets with more than this percentage of returns observations missing will produce values of NaN. Default behavior is that 25% of inputs can be missing. """ window_safe = True dtype = float64_dtype params = ("allowed_missing_count",) @expect_types( target=Asset, regression_length=int, allowed_missing_percentage=(int, float), __funcname="SimpleBeta", ) @expect_bounded( regression_length=(3, None), allowed_missing_percentage=(0.0, 1.0), __funcname="SimpleBeta", ) def __new__(cls, target, regression_length, allowed_missing_percentage=0.25): daily_returns = Returns( window_length=2, mask=(AssetExists() | SingleAsset(asset=target)), ) allowed_missing_count = int(allowed_missing_percentage * regression_length) return super(SimpleBeta, cls).__new__( cls, inputs=[daily_returns, daily_returns[target]], window_length=regression_length, allowed_missing_count=allowed_missing_count, )
[docs] def compute( self, today, assets, out, all_returns, target_returns, allowed_missing_count ): vectorized_beta( dependents=all_returns, independent=target_returns, allowed_missing=allowed_missing_count, out=out, )
[docs] def graph_repr(self): return "{}({!r}, {}, {})".format( type(self).__name__, str(self.target.symbol), # coerce from unicode to str in py2. self.window_length, self.params["allowed_missing_count"], )
@property def target(self): """Get the target of the beta calculation.""" return self.inputs[1].asset def __repr__(self): return "{}({}, length={}, allowed_missing={})".format( type(self).__name__, self.target, self.window_length, self.params["allowed_missing_count"], )
def vectorized_beta(dependents, independent, allowed_missing, out=None): """Compute slopes of linear regressions between columns of ``dependents`` and ``independent``. Parameters ---------- dependents : np.array[N, M] Array with columns of data to be regressed against ``independent``. independent : np.array[N, 1] Independent variable of the regression allowed_missing : int Number of allowed missing (NaN) observations per column. Columns with more than this many non-nan observations in either ``dependents`` or ``independents`` will output NaN as the regression coefficient. out : np.array[M] or None, optional Output array into which to write results. If None, a new array is created and returned. Returns ------- slopes : np.array[M] Linear regression coefficients for each column of ``dependents``. """ # Cache these as locals since we're going to call them multiple times. nan = np.nan isnan = np.isnan N, M = dependents.shape if out is None: out = np.full(M, nan) # Copy N times as a column vector and fill with nans to have the same # missing value pattern as the dependent variable. # # PERF_TODO: We could probably avoid the space blowup by doing this in # Cython. # shape: (N, M) independent = np.where( isnan(dependents), nan, independent, ) # Calculate beta as Cov(X, Y) / Cov(X, X). # https://en.wikipedia.org/wiki/Simple_linear_regression#Fitting_the_regression_line # noqa # # NOTE: The usual formula for covariance is:: # # mean((X - mean(X)) * (Y - mean(Y))) # # However, we don't actually need to take the mean of both sides of the # product, because of the folllowing equivalence:: # # Let X_res = (X - mean(X)). # We have: # # mean(X_res * (Y - mean(Y))) = mean(X_res * (Y - mean(Y))) # (1) = mean((X_res * Y) - (X_res * mean(Y))) # (2) = mean(X_res * Y) - mean(X_res * mean(Y)) # (3) = mean(X_res * Y) - mean(X_res) * mean(Y) # (4) = mean(X_res * Y) - 0 * mean(Y) # (5) = mean(X_res * Y) # # # The tricky step in the above derivation is step (4). We know that # mean(X_res) is zero because, for any X: # # mean(X - mean(X)) = mean(X) - mean(X) = 0. # # The upshot of this is that we only have to center one of `independent` # and `dependent` when calculating covariances. Since we need the centered # `independent` to calculate its variance in the next step, we choose to # center `independent`. # shape: (N, M) ind_residual = independent - nanmean(independent, axis=0) # shape: (M,) covariances = nanmean(ind_residual * dependents, axis=0) # We end up with different variances in each column here because each # column may have a different subset of the data dropped due to missing # data in the corresponding dependent column. # shape: (M,) independent_variances = nanmean(ind_residual**2, axis=0) # shape: (M,) np.divide(covariances, independent_variances, out=out) # Write nans back to locations where we have more then allowed number of # missing entries. nanlocs = isnan(independent).sum(axis=0) > allowed_missing out[nanlocs] = nan return out def vectorized_pearson_r(dependents, independents, allowed_missing, out=None): """Compute Pearson's r between columns of ``dependents`` and ``independents``. Parameters ---------- dependents : np.array[N, M] Array with columns of data to be regressed against ``independent``. independents : np.array[N, M] or np.array[N, 1] Independent variable(s) of the regression. If a single column is passed, it is broadcast to the shape of ``dependents``. allowed_missing : int Number of allowed missing (NaN) observations per column. Columns with more than this many non-nan observations in either ``dependents`` or ``independents`` will output NaN as the correlation coefficient. out : np.array[M] or None, optional Output array into which to write results. If None, a new array is created and returned. Returns ------- correlations : np.array[M] Pearson correlation coefficients for each column of ``dependents``. See Also -------- :class:`zipline.pipeline.factors.RollingPearson` :class:`zipline.pipeline.factors.RollingPearsonOfReturns` """ nan = np.nan isnan = np.isnan N, M = dependents.shape if out is None: out = np.full(M, nan) if allowed_missing > 0: # If we're handling nans robustly, we need to mask both arrays to # locations where either was nan. either_nan = isnan(dependents) | isnan(independents) independents = np.where(either_nan, nan, independents) dependents = np.where(either_nan, nan, dependents) mean = nanmean else: # Otherwise, we can just use mean, which will give us a nan for any # column where there's ever a nan. mean = np.mean # Pearson R is Cov(X, Y) / StdDev(X) * StdDev(Y) # c.f. https://en.wikipedia.org/wiki/Pearson_correlation_coefficient ind_residual = independents - mean(independents, axis=0) dep_residual = dependents - mean(dependents, axis=0) ind_variance = mean(ind_residual**2, axis=0) dep_variance = mean(dep_residual**2, axis=0) covariances = mean(ind_residual * dep_residual, axis=0) evaluate( "where(mask, nan, cov / sqrt(ind_variance * dep_variance))", local_dict={ "cov": covariances, "mask": isnan(independents).sum(axis=0) > allowed_missing, "nan": np.nan, "ind_variance": ind_variance, "dep_variance": dep_variance, }, global_dict={}, out=out, ) return out