Source code for zipline.pipeline.pipeline

from zipline.errors import UnsupportedPipelineOutput
from zipline.utils.input_validation import (
    expect_element,
    expect_types,
    optional,
)

from .domain import Domain, GENERIC, infer_domain
from .graph import ExecutionPlan, TermGraph, SCREEN_NAME
from .filters import Filter
from .term import AssetExists, ComputableTerm, Term


[docs]class Pipeline: """ A Pipeline object represents a collection of named expressions to be compiled and executed by a PipelineEngine. A Pipeline has two important attributes: 'columns', a dictionary of named :class:`~zipline.pipeline.Term` instances, and 'screen', a :class:`~zipline.pipeline.Filter` representing criteria for including an asset in the results of a Pipeline. To compute a pipeline in the context of a TradingAlgorithm, users must call ``attach_pipeline`` in their ``initialize`` function to register that the pipeline should be computed each trading day. The most recent outputs of an attached pipeline can be retrieved by calling ``pipeline_output`` from ``handle_data``, ``before_trading_start``, or a scheduled function. Parameters ---------- columns : dict, optional Initial columns. screen : zipline.pipeline.Filter, optional Initial screen. """ __slots__ = ("_columns", "_screen", "_domain", "__weakref__") @expect_types(columns=optional(dict), screen=optional(Filter), domain=Domain) def __init__(self, columns=None, screen=None, domain=GENERIC): if columns is None: columns = {} validate_column = self.validate_column for column_name, term in columns.items(): validate_column(column_name, term) if not isinstance(term, ComputableTerm): raise TypeError( "Column {column_name!r} contains an invalid pipeline term " "({term}). Did you mean to append '.latest'?".format( column_name=column_name, term=term, ) ) self._columns = columns self._screen = screen self._domain = domain @property def columns(self): """The output columns of this pipeline. Returns ------- columns : dict[str, zipline.pipeline.ComputableTerm] Map from column name to expression computing that column's output. """ return self._columns @property def screen(self): """ The screen of this pipeline. Returns ------- screen : zipline.pipeline.Filter or None Term defining the screen for this pipeline. If ``screen`` is a filter, rows that do not pass the filter (i.e., rows for which the filter computed ``False``) will be dropped from the output of this pipeline before returning results. Notes ----- Setting a screen on a Pipeline does not change the values produced for any rows: it only affects whether a given row is returned. Computing a pipeline with a screen is logically equivalent to computing the pipeline without the screen and then, as a post-processing-step, filtering out any rows for which the screen computed ``False``. """ return self._screen
[docs] @expect_types(term=Term, name=str) def add(self, term, name, overwrite=False): """Add a column. The results of computing ``term`` will show up as a column in the DataFrame produced by running this pipeline. Parameters ---------- column : zipline.pipeline.Term A Filter, Factor, or Classifier to add to the pipeline. name : str Name of the column to add. overwrite : bool Whether to overwrite the existing entry if we already have a column named `name`. """ self.validate_column(name, term) columns = self.columns if name in columns: if overwrite: self.remove(name) else: raise KeyError("Column '{}' already exists.".format(name)) if not isinstance(term, ComputableTerm): raise TypeError( "{term} is not a valid pipeline column. Did you mean to " "append '.latest'?".format(term=term) ) self._columns[name] = term
[docs] @expect_types(name=str) def remove(self, name): """Remove a column. Parameters ---------- name : str The name of the column to remove. Raises ------ KeyError If `name` is not in self.columns. Returns ------- removed : zipline.pipeline.Term The removed term. """ return self.columns.pop(name)
[docs] @expect_types(screen=Filter, overwrite=(bool, int)) def set_screen(self, screen, overwrite=False): """Set a screen on this Pipeline. Parameters ---------- filter : zipline.pipeline.Filter The filter to apply as a screen. overwrite : bool Whether to overwrite any existing screen. If overwrite is False and self.screen is not None, we raise an error. """ if self._screen is not None and not overwrite: raise ValueError( "set_screen() called with overwrite=False and screen already " "set.\n" "If you want to apply multiple filters as a screen use " "set_screen(filter1 & filter2 & ...).\n" "If you want to replace the previous screen with a new one, " "use set_screen(new_filter, overwrite=True)." ) self._screen = screen
[docs] def to_execution_plan(self, domain, default_screen, start_date, end_date): """ Compile into an ExecutionPlan. Parameters ---------- domain : zipline.pipeline.domain.Domain Domain on which the pipeline will be executed. default_screen : zipline.pipeline.Term Term to use as a screen if self.screen is None. all_dates : pd.DatetimeIndex A calendar of dates to use to calculate starts and ends for each term. start_date : pd.Timestamp The first date of requested output. end_date : pd.Timestamp The last date of requested output. Returns ------- graph : zipline.pipeline.graph.ExecutionPlan Graph encoding term dependencies, including metadata about extra row requirements. """ if self._domain is not GENERIC and self._domain is not domain: raise AssertionError( "Attempted to compile Pipeline with domain {} to execution " "plan with different domain {}.".format(self._domain, domain) ) return ExecutionPlan( domain=domain, terms=self._prepare_graph_terms(default_screen), start_date=start_date, end_date=end_date, )
[docs] def to_simple_graph(self, default_screen): """ Compile into a simple TermGraph with no extra row metadata. Parameters ---------- default_screen : zipline.pipeline.Term Term to use as a screen if self.screen is None. Returns ------- graph : zipline.pipeline.graph.TermGraph Graph encoding term dependencies. """ return TermGraph(self._prepare_graph_terms(default_screen))
def _prepare_graph_terms(self, default_screen): """Helper for to_graph and to_execution_plan.""" columns = self.columns.copy() screen = self.screen if screen is None: screen = default_screen columns[SCREEN_NAME] = screen return columns
[docs] @expect_element(format=("svg", "png", "jpeg")) def show_graph(self, format="svg"): """ Render this Pipeline as a DAG. Parameters ---------- format : {'svg', 'png', 'jpeg'} Image format to render with. Default is 'svg'. """ g = self.to_simple_graph(AssetExists()) if format == "svg": return g.svg elif format == "png": return g.png elif format == "jpeg": return g.jpeg else: # We should never get here because of the expect_element decorator # above. raise AssertionError("Unknown graph format %r." % format)
@staticmethod @expect_types(term=Term, column_name=str) def validate_column(column_name, term): if term.ndim == 1: raise UnsupportedPipelineOutput(column_name=column_name, term=term) @property def _output_terms(self): """ A list of terms that are outputs of this pipeline. Includes all terms registered as data outputs of the pipeline, plus the screen, if present. """ terms = list(self._columns.values()) screen = self.screen if screen is not None: terms.append(screen) return terms
[docs] @expect_types(default=Domain) def domain(self, default): """ Get the domain for this pipeline. - If an explicit domain was provided at construction time, use it. - Otherwise, infer a domain from the registered columns. - If no domain can be inferred, return ``default``. Parameters ---------- default : zipline.pipeline.domain.Domain Domain to use if no domain can be inferred from this pipeline by itself. Returns ------- domain : zipline.pipeline.domain.Domain The domain for the pipeline. Raises ------ AmbiguousDomain ValueError If the terms in ``self`` conflict with self._domain. """ # Always compute our inferred domain to ensure that it's compatible # with our explicit domain. inferred = infer_domain(self._output_terms) if inferred is GENERIC and self._domain is GENERIC: # Both generic. Fall back to default. return default elif inferred is GENERIC and self._domain is not GENERIC: # Use the non-generic domain. return self._domain elif inferred is not GENERIC and self._domain is GENERIC: # Use the non-generic domain. return inferred else: # Both non-generic. They have to match. if inferred is not self._domain: raise ValueError( "Conflicting domains in Pipeline. Inferred {}, but {} was " "passed at construction.".format(inferred, self._domain) ) return inferred