Source code for zipline.pipeline.factors.basic

"""Simple common factors.
"""
from numbers import Number
from numpy import (
    arange,
    average,
    clip,
    copyto,
    exp,
    fmax,
    full,
    isnan,
    log,
    NINF,
    sqrt,
    sum as np_sum,
    unique,
    errstate as np_errstate,
)

from zipline.pipeline.data import EquityPricing
from zipline.utils.input_validation import expect_types
from zipline.utils.math_utils import (
    nanargmax,
    nanmax,
    nanmean,
    nanstd,
    nansum,
)
from zipline.utils.numpy_utils import (
    float64_dtype,
    ignore_nanwarnings,
)

from .factor import CustomFactor
from ..mixins import SingleInputMixin


[docs]class Returns(CustomFactor): """ Calculates the percent change in close price over the given window_length. **Default Inputs**: [EquityPricing.close] """ inputs = [EquityPricing.close] window_safe = True def _validate(self): super(Returns, self)._validate() if self.window_length < 2: raise ValueError( "'Returns' expected a window length of at least 2, but was " "given {window_length}. For daily returns, use a window " "length of 2.".format(window_length=self.window_length) )
[docs] def compute(self, today, assets, out, close): out[:] = (close[-1] - close[0]) / close[0]
[docs]class PercentChange(SingleInputMixin, CustomFactor): """ Calculates the percent change over the given window_length. **Default Inputs:** None **Default Window Length:** None Notes ----- Percent change is calculated as ``(new - old) / abs(old)``. """ window_safe = True def _validate(self): super(PercentChange, self)._validate() if self.window_length < 2: raise ValueError( "'PercentChange' expected a window length" "of at least 2, but was given {window_length}. " "For daily percent change, use a window " "length of 2.".format(window_length=self.window_length) )
[docs] def compute(self, today, assets, out, values): with np_errstate(divide="ignore", invalid="ignore"): out[:] = (values[-1] - values[0]) / abs(values[0])
[docs]class DailyReturns(Returns): """ Calculates daily percent change in close price. **Default Inputs**: [EquityPricing.close] """ inputs = [EquityPricing.close] window_safe = True window_length = 2
[docs]class SimpleMovingAverage(SingleInputMixin, CustomFactor): """ Average Value of an arbitrary column **Default Inputs**: None **Default Window Length**: None """ # numpy's nan functions throw warnings when passed an array containing only # nans, but they still returns the desired value (nan), so we ignore the # warning. ctx = ignore_nanwarnings()
[docs] def compute(self, today, assets, out, data): out[:] = nanmean(data, axis=0)
[docs]class WeightedAverageValue(CustomFactor): """ Helper for VWAP-like computations. **Default Inputs:** None **Default Window Length:** None """
[docs] def compute(self, today, assets, out, base, weight): out[:] = nansum(base * weight, axis=0) / nansum(weight, axis=0)
[docs]class VWAP(WeightedAverageValue): """ Volume Weighted Average Price **Default Inputs:** [EquityPricing.close, EquityPricing.volume] **Default Window Length:** None """ inputs = (EquityPricing.close, EquityPricing.volume)
[docs]class MaxDrawdown(SingleInputMixin, CustomFactor): """ Max Drawdown **Default Inputs:** None **Default Window Length:** None """ ctx = ignore_nanwarnings()
[docs] def compute(self, today, assets, out, data): drawdowns = fmax.accumulate(data, axis=0) - data drawdowns[isnan(drawdowns)] = NINF drawdown_ends = nanargmax(drawdowns, axis=0) # TODO: Accelerate this loop in Cython or Numba. for i, end in enumerate(drawdown_ends): peak = nanmax(data[: end + 1, i]) out[i] = (peak - data[end, i]) / data[end, i]
[docs]class AverageDollarVolume(CustomFactor): """ Average Daily Dollar Volume **Default Inputs:** [EquityPricing.close, EquityPricing.volume] **Default Window Length:** None """ inputs = [EquityPricing.close, EquityPricing.volume]
[docs] def compute(self, today, assets, out, close, volume): out[:] = nansum(close * volume, axis=0) / len(close)
def exponential_weights(length, decay_rate): """ Build a weight vector for an exponentially-weighted statistic. The resulting ndarray is of the form:: [decay_rate ** length, ..., decay_rate ** 2, decay_rate] Parameters ---------- length : int The length of the desired weight vector. decay_rate : float The rate at which entries in the weight vector increase or decrease. Returns ------- weights : ndarray[float64] """ return full(length, decay_rate, float64_dtype) ** arange(length + 1, 1, -1) class _ExponentialWeightedFactor(SingleInputMixin, CustomFactor): """ Base class for factors implementing exponential-weighted operations. **Default Inputs:** None **Default Window Length:** None Parameters ---------- inputs : length-1 list or tuple of BoundColumn The expression over which to compute the average. window_length : int > 0 Length of the lookback window over which to compute the average. decay_rate : float, 0 < decay_rate <= 1 Weighting factor by which to discount past observations. When calculating historical averages, rows are multiplied by the sequence:: decay_rate, decay_rate ** 2, decay_rate ** 3, ... Methods ------- weights from_span from_halflife from_center_of_mass """ params = ("decay_rate",) @classmethod @expect_types(span=Number) def from_span(cls, inputs, window_length, span, **kwargs): """ Convenience constructor for passing `decay_rate` in terms of `span`. Forwards `decay_rate` as `1 - (2.0 / (1 + span))`. This provides the behavior equivalent to passing `span` to pandas.ewma. Examples -------- .. code-block:: python # Equivalent to: # my_ewma = EWMA( # inputs=[EquityPricing.close], # window_length=30, # decay_rate=(1 - (2.0 / (1 + 15.0))), # ) my_ewma = EWMA.from_span( inputs=[EquityPricing.close], window_length=30, span=15, ) Notes ----- This classmethod is provided by both :class:`ExponentialWeightedMovingAverage` and :class:`ExponentialWeightedMovingStdDev`. """ if span <= 1: raise ValueError("`span` must be a positive number. %s was passed." % span) decay_rate = 1.0 - (2.0 / (1.0 + span)) assert 0.0 < decay_rate <= 1.0 return cls( inputs=inputs, window_length=window_length, decay_rate=decay_rate, **kwargs ) @classmethod @expect_types(halflife=Number) def from_halflife(cls, inputs, window_length, halflife, **kwargs): """ Convenience constructor for passing ``decay_rate`` in terms of half life. Forwards ``decay_rate`` as ``exp(log(.5) / halflife)``. This provides the behavior equivalent to passing `halflife` to pandas.ewma. Examples -------- .. code-block:: python # Equivalent to: # my_ewma = EWMA( # inputs=[EquityPricing.close], # window_length=30, # decay_rate=np.exp(np.log(0.5) / 15), # ) my_ewma = EWMA.from_halflife( inputs=[EquityPricing.close], window_length=30, halflife=15, ) Notes ----- This classmethod is provided by both :class:`ExponentialWeightedMovingAverage` and :class:`ExponentialWeightedMovingStdDev`. """ if halflife <= 0: raise ValueError( "`span` must be a positive number. %s was passed." % halflife ) decay_rate = exp(log(0.5) / halflife) assert 0.0 < decay_rate <= 1.0 return cls( inputs=inputs, window_length=window_length, decay_rate=decay_rate, **kwargs ) @classmethod def from_center_of_mass(cls, inputs, window_length, center_of_mass, **kwargs): """ Convenience constructor for passing `decay_rate` in terms of center of mass. Forwards `decay_rate` as `1 - (1 / 1 + center_of_mass)`. This provides behavior equivalent to passing `center_of_mass` to pandas.ewma. Examples -------- .. code-block:: python # Equivalent to: # my_ewma = EWMA( # inputs=[EquityPricing.close], # window_length=30, # decay_rate=(1 - (1 / 15.0)), # ) my_ewma = EWMA.from_center_of_mass( inputs=[EquityPricing.close], window_length=30, center_of_mass=15, ) Notes ----- This classmethod is provided by both :class:`ExponentialWeightedMovingAverage` and :class:`ExponentialWeightedMovingStdDev`. """ return cls( inputs=inputs, window_length=window_length, decay_rate=(1.0 - (1.0 / (1.0 + center_of_mass))), **kwargs, )
[docs]class ExponentialWeightedMovingAverage(_ExponentialWeightedFactor): """ Exponentially Weighted Moving Average **Default Inputs:** None **Default Window Length:** None Parameters ---------- inputs : length-1 list/tuple of BoundColumn The expression over which to compute the average. window_length : int > 0 Length of the lookback window over which to compute the average. decay_rate : float, 0 < decay_rate <= 1 Weighting factor by which to discount past observations. When calculating historical averages, rows are multiplied by the sequence:: decay_rate, decay_rate ** 2, decay_rate ** 3, ... Notes ----- - This class can also be imported under the name ``EWMA``. See Also -------- :meth:`pandas.DataFrame.ewm` """
[docs] def compute(self, today, assets, out, data, decay_rate): out[:] = average( data, axis=0, weights=exponential_weights(len(data), decay_rate), )
[docs]class ExponentialWeightedMovingStdDev(_ExponentialWeightedFactor): """ Exponentially Weighted Moving Standard Deviation **Default Inputs:** None **Default Window Length:** None Parameters ---------- inputs : length-1 list/tuple of BoundColumn The expression over which to compute the average. window_length : int > 0 Length of the lookback window over which to compute the average. decay_rate : float, 0 < decay_rate <= 1 Weighting factor by which to discount past observations. When calculating historical averages, rows are multiplied by the sequence:: decay_rate, decay_rate ** 2, decay_rate ** 3, ... Notes ----- - This class can also be imported under the name ``EWMSTD``. See Also -------- :func:`pandas.DataFrame.ewm` """
[docs] def compute(self, today, assets, out, data, decay_rate): weights = exponential_weights(len(data), decay_rate) mean = average(data, axis=0, weights=weights) variance = average((data - mean) ** 2, axis=0, weights=weights) squared_weight_sum = np_sum(weights) ** 2 bias_correction = squared_weight_sum / ( squared_weight_sum - np_sum(weights**2) ) out[:] = sqrt(variance * bias_correction)
class LinearWeightedMovingAverage(SingleInputMixin, CustomFactor): """ Weighted Average Value of an arbitrary column **Default Inputs**: None **Default Window Length**: None """ # numpy's nan functions throw warnings when passed an array containing only # nans, but they still returns the desired value (nan), so we ignore the # warning. ctx = ignore_nanwarnings() def compute(self, today, assets, out, data): ndays = data.shape[0] # Initialize weights array weights = arange(1, ndays + 1, dtype=float64_dtype).reshape(ndays, 1) # Compute normalizer normalizer = (ndays * (ndays + 1)) / 2 # Weight the data weighted_data = data * weights # Compute weighted averages out[:] = nansum(weighted_data, axis=0) / normalizer class AnnualizedVolatility(CustomFactor): """ Volatility. The degree of variation of a series over time as measured by the standard deviation of daily returns. https://en.wikipedia.org/wiki/Volatility_(finance) **Default Inputs:** [Returns(window_length=2)] Parameters ---------- annualization_factor : float, optional The number of time units per year. Defaults is 252, the number of NYSE trading days in a normal year. """ inputs = [Returns(window_length=2)] params = {"annualization_factor": 252.0} window_length = 252 def compute(self, today, assets, out, returns, annualization_factor): out[:] = nanstd(returns, axis=0) * (annualization_factor**0.5)
[docs]class PeerCount(SingleInputMixin, CustomFactor): """ Peer Count of distinct categories in a given classifier. This factor is returned by the classifier instance method peer_count() **Default Inputs:** None **Default Window Length:** 1 """ window_length = 1 def _validate(self): super(PeerCount, self)._validate() if self.window_length != 1: raise ValueError( "'PeerCount' expected a window length of 1, but was given" "{window_length}.".format(window_length=self.window_length) )
[docs] def compute(self, today, assets, out, classifier_values): # Convert classifier array to group label int array group_labels, null_label = self.inputs[0]._to_integral(classifier_values[0]) _, inverse, counts = unique( # Get counts, idx of unique groups group_labels, return_counts=True, return_inverse=True, ) copyto(out, counts[inverse], where=(group_labels != null_label))
# Convenience aliases EWMA = ExponentialWeightedMovingAverage EWMSTD = ExponentialWeightedMovingStdDev class Clip(CustomFactor): """ Clip (limit) the values in a factor. Given an interval, values outside the interval are clipped to the interval edges. For example, if an interval of ``[0, 1]`` is specified, values smaller than 0 become 0, and values larger than 1 become 1. **Default Window Length:** 1 Parameters ---------- min_bound : float The minimum value to use. max_bound : float The maximum value to use. Notes ----- To only clip values on one side, ``-np.inf` and ``np.inf`` may be passed. For example, to only clip the maximum value but not clip a minimum value: .. code-block:: python Clip(inputs=[factor], min_bound=-np.inf, max_bound=user_provided_max) See Also -------- numpy.clip """ window_length = 1 params = ("min_bound", "max_bound") def compute(self, today, assets, out, values, min_bound, max_bound): clip(values[-1], min_bound, max_bound, out=out)