Source code for zipline.pipeline.term

"""
Base class for Filters, Factors and Classifiers
"""
from abc import ABC, abstractmethod
from bisect import insort
from collections.abc import Mapping
from weakref import WeakValueDictionary

from numpy import (
    array,
    record,
    dtype as dtype_class,
    ndarray,
)
from zipline.assets import Asset
from zipline.errors import (
    DTypeNotSpecified,
    InvalidOutputName,
    NonSliceableTerm,
    NonWindowSafeInput,
    NotDType,
    NonPipelineInputs,
    TermInputsNotSpecified,
    TermOutputsEmpty,
    UnsupportedDType,
    WindowLengthNotSpecified,
)
from zipline.lib.adjusted_array import can_represent_dtype
from zipline.lib.labelarray import LabelArray
from zipline.utils.input_validation import expect_types
from zipline.utils.memoize import classlazyval, lazyval
from zipline.utils.numpy_utils import (
    bool_dtype,
    categorical_dtype,
    datetime64ns_dtype,
    default_missing_value_for_dtype,
    float64_dtype,
)
from zipline.utils.sharedoc import (
    templated_docstring,
    PIPELINE_ALIAS_NAME_DOC,
    PIPELINE_DOWNSAMPLING_FREQUENCY_DOC,
)

from .domain import Domain, GENERIC, infer_domain
from .downsample_helpers import expect_downsample_frequency
from .sentinels import NotSpecified


[docs]class Term(ABC): """ Base class for objects that can appear in the compute graph of a :class:`zipline.pipeline.Pipeline`. Notes ----- Most Pipeline API users only interact with :class:`Term` via subclasses: - :class:`~zipline.pipeline.data.BoundColumn` - :class:`~zipline.pipeline.Factor` - :class:`~zipline.pipeline.Filter` - :class:`~zipline.pipeline.Classifier` Instances of :class:`Term` are **memoized**. If you call a Term's constructor with the same arguments twice, the same object will be returned from both calls: **Example:** >>> from zipline.pipeline.data import EquityPricing >>> from zipline.pipeline.factors import SimpleMovingAverage >>> x = SimpleMovingAverage(inputs=[EquityPricing.close], window_length=5) >>> y = SimpleMovingAverage(inputs=[EquityPricing.close], window_length=5) >>> x is y True .. warning:: Memoization of terms means that it's generally unsafe to modify attributes of a term after construction. """ # These are NotSpecified because a subclass is required to provide them. dtype = NotSpecified missing_value = NotSpecified # Subclasses aren't required to provide `params`. The default behavior is # no params. params = () # All terms are generic by default. domain = GENERIC # Determines if a term is safe to be used as a windowed input. window_safe = False # The dimensions of the term's output (1D or 2D). ndim = 2 _term_cache = WeakValueDictionary() def __new__( cls, domain=NotSpecified, dtype=NotSpecified, missing_value=NotSpecified, window_safe=NotSpecified, ndim=NotSpecified, # params is explicitly not allowed to be passed to an instance. *args, **kwargs, ): """ Memoized constructor for Terms. Caching previously-constructed Terms is useful because it allows us to only compute equivalent sub-expressions once when traversing a Pipeline dependency graph. Caching previously-constructed Terms is **sane** because terms and their inputs are both conceptually immutable. """ # Subclasses can override these class-level attributes to provide # different default values for instances. if domain is NotSpecified: domain = cls.domain if dtype is NotSpecified: dtype = cls.dtype if missing_value is NotSpecified: missing_value = cls.missing_value if ndim is NotSpecified: ndim = cls.ndim if window_safe is NotSpecified: window_safe = cls.window_safe dtype, missing_value = validate_dtype( cls.__name__, dtype, missing_value, ) params = cls._pop_params(kwargs) identity = cls._static_identity( domain=domain, dtype=dtype, missing_value=missing_value, window_safe=window_safe, ndim=ndim, params=params, *args, **kwargs, ) try: return cls._term_cache[identity] except KeyError: new_instance = cls._term_cache[identity] = ( super(Term, cls) .__new__(cls) ._init( domain=domain, dtype=dtype, missing_value=missing_value, window_safe=window_safe, ndim=ndim, params=params, *args, **kwargs, ) ) return new_instance @classmethod def _pop_params(cls, kwargs): """ Pop entries from the `kwargs` passed to cls.__new__ based on the values in `cls.params`. Parameters ---------- kwargs : dict The kwargs passed to cls.__new__. Returns ------- params : list[(str, object)] A list of string, value pairs containing the entries in cls.params. Raises ------ TypeError Raised if any parameter values are not passed or not hashable. """ params = cls.params if not isinstance(params, Mapping): params = {k: NotSpecified for k in params} param_values = [] for key, default_value in params.items(): try: value = kwargs.pop(key, default_value) if value is NotSpecified: raise KeyError(key) # Check here that the value is hashable so that we fail here # instead of trying to hash the param values tuple later. hash(value) except KeyError as exc: raise TypeError( "{typename} expected a keyword parameter {name!r}.".format( typename=cls.__name__, name=key ) ) from exc except TypeError as exc: # Value wasn't hashable. raise TypeError( "{typename} expected a hashable value for parameter " "{name!r}, but got {value!r} instead.".format( typename=cls.__name__, name=key, value=value, ) ) from exc param_values.append((key, value)) return tuple(param_values) def __init__(self, *args, **kwargs): """ Noop constructor to play nicely with our caching __new__. Subclasses should implement _init instead of this method. When a class' __new__ returns an instance of that class, Python will automatically call __init__ on the object, even if a new object wasn't actually constructed. Because we memoize instances, we often return an object that was already initialized from __new__, in which case we don't want to call __init__ again. Subclasses that need to initialize new instances should override _init, which is guaranteed to be called only once. """ pass @expect_types(key=Asset) def __getitem__(self, key): if isinstance(self, LoadableTerm): raise NonSliceableTerm(term=self) from .mixins import SliceMixin slice_type = type(self)._with_mixin(SliceMixin) return slice_type(self, key) @classmethod def _static_identity(cls, domain, dtype, missing_value, window_safe, ndim, params): """ Return the identity of the Term that would be constructed from the given arguments. Identities that compare equal will cause us to return a cached instance rather than constructing a new one. We do this primarily because it makes dependency resolution easier. This is a classmethod so that it can be called from Term.__new__ to determine whether to produce a new instance. """ return (cls, domain, dtype, missing_value, window_safe, ndim, params) def _init(self, domain, dtype, missing_value, window_safe, ndim, params): """ Parameters ---------- domain : zipline.pipeline.domain.Domain The domain of this term. dtype : np.dtype Dtype of this term's output. missing_value : object Missing value for this term. ndim : 1 or 2 The dimensionality of this term. params : tuple[(str, hashable)] Tuple of key/value pairs of additional parameters. """ self.domain = domain self.dtype = dtype self.missing_value = missing_value self.window_safe = window_safe self.ndim = ndim for name, _ in params: if hasattr(self, name): raise TypeError( "Parameter {name!r} conflicts with already-present" " attribute with value {value!r}.".format( name=name, value=getattr(self, name), ) ) # TODO: Consider setting these values as attributes and replacing # the boilerplate in NumericalExpression, Rank, and # PercentileFilter. self.params = dict(params) # Make sure that subclasses call super() in their _validate() methods # by setting this flag. The base class implementation of _validate # should set this flag to True. self._subclass_called_super_validate = False self._validate() assert self._subclass_called_super_validate, ( "Term._validate() was not called.\n" "This probably means that you overrode _validate" " without calling super()." ) del self._subclass_called_super_validate return self def _validate(self): """ Assert that this term is well-formed. This should be called exactly once, at the end of Term._init(). """ # mark that we got here to enforce that subclasses overriding _validate # call super(). self._subclass_called_super_validate = True def compute_extra_rows(self, all_dates, start_date, end_date, min_extra_rows): """ Calculate the number of extra rows needed to compute ``self``. Must return at least ``min_extra_rows``, and the default implementation is to just return ``min_extra_rows``. This is overridden by downsampled terms to ensure that the first date computed is a recomputation date. Parameters ---------- all_dates : pd.DatetimeIndex The trading sessions against which ``self`` will be computed. start_date : pd.Timestamp The first date for which final output is requested. end_date : pd.Timestamp The last date for which final output is requested. min_extra_rows : int The minimum number of extra rows required of ``self``, as determined by other terms that depend on ``self``. Returns ------- extra_rows : int The number of extra rows to compute. Must be at least ``min_extra_rows``. """ return min_extra_rows @property @abstractmethod def inputs(self): """ A tuple of other Terms needed as inputs for ``self``. """ raise NotImplementedError("inputs") @property @abstractmethod def windowed(self): """ Boolean indicating whether this term is a trailing-window computation. """ raise NotImplementedError("windowed") @property @abstractmethod def mask(self): """ A :class:`~zipline.pipeline.Filter` representing asset/date pairs to while computing this Term. True means include; False means exclude. """ raise NotImplementedError("mask") @property @abstractmethod def dependencies(self): """ A dictionary mapping terms that must be computed before `self` to the number of extra rows needed for those terms. """ raise NotImplementedError("dependencies")
[docs] def graph_repr(self): """A short repr to use when rendering GraphViz graphs.""" # Default graph_repr is just the name of the type. return type(self).__name__
[docs] def recursive_repr(self): """A short repr to use when recursively rendering terms with inputs.""" # Default recursive_repr is just the name of the type. return type(self).__name__
class AssetExists(Term): """ Pseudo-filter describing whether or not an asset existed on a given day. This is the default mask for all terms that haven't been passed a mask explicitly. This is morally a Filter, in the sense that it produces a boolean value for every asset on every date. We don't subclass Filter, however, because `AssetExists` is computed directly by the PipelineEngine. This term is guaranteed to be available as an input for any term computed by SimplePipelineEngine.run_pipeline(). See Also -------- zipline.assets.AssetFinder.lifetimes """ dtype = bool_dtype dataset = None inputs = () dependencies = {} mask = None windowed = False def __repr__(self): return "AssetExists()" graph_repr = __repr__ def _compute(self, today, assets, out): raise NotImplementedError( "AssetExists cannot be computed directly." " Check your PipelineEngine configuration." ) class InputDates(Term): """ 1-Dimensional term providing date labels for other term inputs. This term is guaranteed to be available as an input for any term computed by SimplePipelineEngine.run_pipeline(). """ ndim = 1 dataset = None dtype = datetime64ns_dtype inputs = () dependencies = {} mask = None windowed = False window_safe = True def __repr__(self): return "InputDates()" graph_repr = __repr__ def _compute(self, today, assets, out): raise NotImplementedError( "InputDates cannot be computed directly." " Check your PipelineEngine configuration." ) class LoadableTerm(Term): """ A Term that should be loaded from an external resource by a PipelineLoader. This is the base class for :class:`zipline.pipeline.data.BoundColumn`. """ windowed = False inputs = () @lazyval def dependencies(self): return {self.mask: 0} class ComputableTerm(Term): """ A Term that should be computed from a tuple of inputs. This is the base class for :class:`zipline.pipeline.Factor`, :class:`zipline.pipeline.Filter`, and :class:`zipline.pipeline.Classifier`. """ inputs = NotSpecified outputs = NotSpecified window_length = NotSpecified mask = NotSpecified domain = NotSpecified def __new__( cls, inputs=inputs, outputs=outputs, window_length=window_length, mask=mask, domain=domain, *args, **kwargs, ): if inputs is NotSpecified: inputs = cls.inputs # Having inputs = NotSpecified is an error, but we handle it later # in self._validate rather than here. if inputs is not NotSpecified: # Allow users to specify lists as class-level defaults, but # normalize to a tuple so that inputs is hashable. inputs = tuple(inputs) # Make sure all our inputs are valid pipeline objects before trying # to infer a domain. non_terms = [t for t in inputs if not isinstance(t, Term)] if non_terms: raise NonPipelineInputs(cls.__name__, non_terms) if domain is NotSpecified: domain = infer_domain(inputs) if outputs is NotSpecified: outputs = cls.outputs if outputs is not NotSpecified: outputs = tuple(outputs) if mask is NotSpecified: mask = cls.mask if mask is NotSpecified: mask = AssetExists() if window_length is NotSpecified: window_length = cls.window_length return super(ComputableTerm, cls).__new__( cls, inputs=inputs, outputs=outputs, mask=mask, window_length=window_length, domain=domain, *args, **kwargs, ) def _init(self, inputs, outputs, window_length, mask, *args, **kwargs): self.inputs = inputs self.outputs = outputs self.window_length = window_length self.mask = mask return super(ComputableTerm, self)._init(*args, **kwargs) @classmethod def _static_identity(cls, inputs, outputs, window_length, mask, *args, **kwargs): return ( super(ComputableTerm, cls)._static_identity(*args, **kwargs), inputs, outputs, window_length, mask, ) def _validate(self): super(ComputableTerm, self)._validate() # Check inputs. if self.inputs is NotSpecified: raise TermInputsNotSpecified(termname=type(self).__name__) if not isinstance(self.domain, Domain): raise TypeError( "Expected {}.domain to be an instance of Domain, " "but got {}.".format(type(self).__name__, type(self.domain)) ) # Check outputs. if self.outputs is NotSpecified: pass elif not self.outputs: raise TermOutputsEmpty(termname=type(self).__name__) else: # Raise an exception if there are any naming conflicts between the # term's output names and certain attributes. disallowed_names = [ attr for attr in dir(ComputableTerm) if not attr.startswith("_") ] # The name 'compute' is an added special case that is disallowed. # Use insort to add it to the list in alphabetical order. insort(disallowed_names, "compute") for output in self.outputs: if output.startswith("_") or output in disallowed_names: raise InvalidOutputName( output_name=output, termname=type(self).__name__, disallowed_names=disallowed_names, ) if self.window_length is NotSpecified: raise WindowLengthNotSpecified(termname=type(self).__name__) if self.mask is NotSpecified: # This isn't user error, this is a bug in our code. raise AssertionError("{term} has no mask".format(term=self)) if self.window_length > 1: for child in self.inputs: if not child.window_safe: raise NonWindowSafeInput(parent=self, child=child) def _compute(self, inputs, dates, assets, mask): """ Subclasses should implement this to perform actual computation. This is named ``_compute`` rather than just ``compute`` because ``compute`` is reserved for user-supplied functions in CustomFilter/CustomFactor/CustomClassifier. """ raise NotImplementedError("_compute") # NOTE: This is a method rather than a property because ABCMeta tries to # access all abstract attributes of its child classes to see if # they've been implemented. These accesses happen during subclass # creation, before the new subclass has been bound to a name in its # defining scope. Filter, Factor, and Classifier each implement this # method to return themselves, but if the method is invoked before # class definition is finished (which happens if this is a property), # they fail with a NameError. @classmethod @abstractmethod def _principal_computable_term_type(cls): """ Return the "principal" type for a ComputableTerm. This returns either Filter, Factor, or Classifier, depending on the type of ``cls``. It is used to implement behaviors like ``downsample`` and ``if_then_else`` that are implemented on all ComputableTerms, but that need to produce different output types depending on the type of the receiver. """ raise NotImplementedError("_principal_computable_term_type") @lazyval def windowed(self): """ Whether or not this term represents a trailing window computation. If term.windowed is truthy, its compute_from_windows method will be called with instances of AdjustedArray as inputs. If term.windowed is falsey, its compute_from_baseline will be called with instances of np.ndarray as inputs. """ return self.window_length is not NotSpecified and self.window_length > 0 @lazyval def dependencies(self): """ The number of extra rows needed for each of our inputs to compute this term. """ extra_input_rows = max(0, self.window_length - 1) out = {} for term in self.inputs: out[term] = extra_input_rows out[self.mask] = 0 return out @expect_types(data=ndarray) def postprocess(self, data): """ Called with an result of ``self``, unravelled (i.e. 1-dimensional) after any user-defined screens have been applied. This is mostly useful for transforming the dtype of an output, e.g., to convert a LabelArray into a pandas Categorical. The default implementation is to just return data unchanged. """ # starting with pandas 1.4, record arrays are no longer supported as DataFrame columns if isinstance(data[0], record): return [tuple(r) for r in data] return data def to_workspace_value(self, result, assets): """ Called with a column of the result of a pipeline. This needs to put the data into a format that can be used in a workspace to continue doing computations. Parameters ---------- result : pd.Series A multiindexed series with (dates, assets) whose values are the results of running this pipeline term over the dates. assets : pd.Index All of the assets being requested. This allows us to correctly shape the workspace value. Returns ------- workspace_value : array-like An array like value that the engine can consume. """ return ( result.unstack() .fillna(self.missing_value) .reindex(columns=assets, fill_value=self.missing_value) .values ) @expect_downsample_frequency @templated_docstring(frequency=PIPELINE_DOWNSAMPLING_FREQUENCY_DOC) def downsample(self, frequency): """ Make a term that computes from ``self`` at lower-than-daily frequency. Parameters ---------- {frequency} """ from .mixins import DownsampledMixin downsampled_type = type(self)._with_mixin(DownsampledMixin) return downsampled_type(term=self, frequency=frequency) @templated_docstring(name=PIPELINE_ALIAS_NAME_DOC) def alias(self, name): """ Make a term from ``self`` that names the expression. Parameters ---------- {name} Returns ------- aliased : Aliased ``self`` with a name. Notes ----- This is useful for giving a name to a numerical or boolean expression. """ from .mixins import AliasedMixin aliased_type = type(self)._with_mixin(AliasedMixin) return aliased_type(term=self, name=name) def isnull(self): """ A Filter producing True for values where this Factor has missing data. Equivalent to self.isnan() when ``self.dtype`` is float64. Otherwise equivalent to ``self.eq(self.missing_value)``. Returns ------- filter : zipline.pipeline.Filter """ if self.dtype == bool_dtype: raise TypeError("isnull() is not supported for Filters") from .filters import NullFilter if self.dtype == float64_dtype: # Using isnan is more efficient when possible because we can fold # the isnan computation with other NumExpr expressions. return self.isnan() else: return NullFilter(self) def notnull(self): """ A Filter producing True for values where this Factor has complete data. Equivalent to ``~self.isnan()` when ``self.dtype`` is float64. Otherwise equivalent to ``(self != self.missing_value)``. Returns ------- filter : zipline.pipeline.Filter """ if self.dtype == bool_dtype: raise TypeError("notnull() is not supported for Filters") from .filters import NotNullFilter return NotNullFilter(self) def fillna(self, fill_value): """ Create a new term that fills missing values of this term's output with ``fill_value``. Parameters ---------- fill_value : zipline.pipeline.ComputableTerm, or object. Object to use as replacement for missing values. If a ComputableTerm (e.g. a Factor) is passed, that term's results will be used as fill values. If a scalar (e.g. a number) is passed, the scalar will be used as a fill value. Examples -------- **Filling with a Scalar:** Let ``f`` be a Factor which would produce the following output:: AAPL MSFT MCD BK 2017-03-13 1.0 NaN 3.0 4.0 2017-03-14 1.5 2.5 NaN NaN Then ``f.fillna(0)`` produces the following output:: AAPL MSFT MCD BK 2017-03-13 1.0 0.0 3.0 4.0 2017-03-14 1.5 2.5 0.0 0.0 **Filling with a Term:** Let ``f`` be as above, and let ``g`` be another Factor which would produce the following output:: AAPL MSFT MCD BK 2017-03-13 10.0 20.0 30.0 40.0 2017-03-14 15.0 25.0 35.0 45.0 Then, ``f.fillna(g)`` produces the following output:: AAPL MSFT MCD BK 2017-03-13 1.0 20.0 3.0 4.0 2017-03-14 1.5 2.5 35.0 45.0 Returns ------- filled : zipline.pipeline.ComputableTerm A term computing the same results as ``self``, but with missing values filled in using values from ``fill_value``. """ if self.dtype == bool_dtype: raise TypeError("fillna() is not supported for Filters") if isinstance(fill_value, LoadableTerm): raise TypeError( "Can't use expression {} as a fill value. Did you mean to " "append '.latest?'".format(fill_value) ) elif isinstance(fill_value, ComputableTerm): if_false = fill_value else: # Assume we got a scalar value. Make sure it's compatible with our # dtype. try: fill_value = _coerce_to_dtype(fill_value, self.dtype) except TypeError as exc: raise TypeError( "Fill value {value!r} is not a valid choice " "for term {termname} with dtype {dtype}.\n\n" "Coercion attempt failed with: {error}".format( termname=type(self).__name__, value=fill_value, dtype=self.dtype, error=exc, ) ) from exc if_false = self._constant_type( const=fill_value, dtype=self.dtype, missing_value=self.missing_value, ) return self.notnull().if_else(if_true=self, if_false=if_false) @classlazyval def _constant_type(cls): from .mixins import ConstantMixin return cls._with_mixin(ConstantMixin) @classlazyval def _if_else_type(cls): from .mixins import IfElseMixin return cls._with_mixin(IfElseMixin) def __repr__(self): return ("{type}([{inputs}], {window_length})").format( type=type(self).__name__, inputs=", ".join(i.recursive_repr() for i in self.inputs), window_length=self.window_length, ) def recursive_repr(self): return type(self).__name__ + "(...)" @classmethod def _with_mixin(cls, mixin_type): return mixin_type.universal_mixin_specialization( cls._principal_computable_term_type(), ) def validate_dtype(termname, dtype, missing_value): """ Validate a `dtype` and `missing_value` passed to Term.__new__. Ensures that we know how to represent ``dtype``, and that missing_value is specified for types without default missing values. Returns ------- validated_dtype, validated_missing_value : np.dtype, any The dtype and missing_value to use for the new term. Raises ------ DTypeNotSpecified When no dtype was passed to the instance, and the class doesn't provide a default. NotDType When either the class or the instance provides a value not coercible to a numpy dtype. NoDefaultMissingValue When dtype requires an explicit missing_value, but ``missing_value`` is NotSpecified. """ if dtype is NotSpecified: raise DTypeNotSpecified(termname=termname) try: dtype = dtype_class(dtype) except TypeError as exc: raise NotDType(dtype=dtype, termname=termname) from exc if not can_represent_dtype(dtype): raise UnsupportedDType(dtype=dtype, termname=termname) if missing_value is NotSpecified: missing_value = default_missing_value_for_dtype(dtype) try: _coerce_to_dtype(missing_value, dtype) except TypeError as exc: raise TypeError( "Missing value {value!r} is not a valid choice " "for term {termname} with dtype {dtype}.\n\n" "Coercion attempt failed with: {error}".format( termname=termname, value=missing_value, dtype=dtype, error=exc, ) ) from exc return dtype, missing_value def _assert_valid_categorical_missing_value(value): """ Check that value is a valid categorical missing_value. Raises a TypeError if the value is cannot be used as the missing_value for a categorical_dtype Term. """ label_types = LabelArray.SUPPORTED_SCALAR_TYPES if not isinstance(value, label_types): raise TypeError( "String-dtype classifiers can only produce {types}.".format( types=" or ".join([t.__name__ for t in label_types]) ) ) def _coerce_to_dtype(value, dtype): if dtype == categorical_dtype: # This check is necessary because we use object dtype for # categoricals, and numpy will allow us to promote numerical # values to object even though we don't support them. _assert_valid_categorical_missing_value(value) return value else: # For any other type, cast using the same rules as numpy's astype # function with casting='same_kind'. # # 'same_kind' allows casting between things like float32 and float64, # but not between str and int. Note that the name is somewhat # misleading, since it does allow conversion between different dtype # kinds in some cases. In particular, conversion from int to float is # allowed. return array([value]).astype(dtype=dtype, casting="same_kind")[0]