Source code for zipline.pipeline.engine

"""
Computation engines for executing Pipelines.

This module defines the core computation algorithms for executing Pipelines.

The primary entrypoint of this file is SimplePipelineEngine.run_pipeline, which
implements the following algorithm for executing pipelines:

1. Determine the domain of the pipeline. The domain determines the
   top-level set of dates and assets that serve as row- and
   column-labels for the computations performed by this
   pipeline. This logic lives in
   zipline.pipeline.domain.infer_domain.

2. Build a dependency graph of all terms in `pipeline`, with
   information about how many extra rows each term needs from its
   inputs. At this point we also **specialize** any generic
   LoadableTerms to the domain determined in (1). This logic lives in
   zipline.pipeline.graph.TermGraph and
   zipline.pipeline.graph.ExecutionPlan.

3. Combine the domain computed in (2) with our AssetFinder to produce
   a "lifetimes matrix". The lifetimes matrix is a DataFrame of
   booleans whose labels are dates x assets. Each entry corresponds
   to a (date, asset) pair and indicates whether the asset in
   question was tradable on the date in question. This logic
   primarily lives in AssetFinder.lifetimes.

4. Call self._populate_initial_workspace, which produces a
   "workspace" dictionary containing cached or otherwise pre-computed
   terms. By default, the initial workspace contains the lifetimes
   matrix and its date labels.

5. Topologically sort the graph constructed in (1) to produce an
   execution order for any terms that were not pre-populated.  This
   logic lives in TermGraph.

6. Iterate over the terms in the order computed in (5). For each term:

   a. Fetch the term's inputs from the workspace, possibly removing
      unneeded leading rows from the input (see ExecutionPlan.offset
      for details on why we might have extra leading rows).

   b. Call ``term._compute`` with the inputs. Store the results into
      the workspace.

   c. Decrement "reference counts" on the term's inputs, and remove
      their results from the workspace if the refcount hits 0. This
      significantly reduces the maximum amount of memory that we
      consume during execution

   This logic lives in SimplePipelineEngine.compute_chunk.

7. Extract the pipeline's outputs from the workspace and convert them
   into "narrow" format, with output labels dictated by the Pipeline's
   screen. This logic lives in SimplePipelineEngine._to_narrow.
"""
from abc import ABC, abstractmethod
from functools import partial

import pandas as pd
from numpy import arange, array
from toolz import groupby

from zipline.errors import NoFurtherDataError
from zipline.lib.adjusted_array import ensure_adjusted_array, ensure_ndarray
from zipline.utils.date_utils import compute_date_range_chunks
from zipline.utils.input_validation import expect_types
from zipline.utils.numpy_utils import as_column, repeat_first_axis, repeat_last_axis
from zipline.utils.pandas_utils import categorical_df_concat, explode
from zipline.utils.string_formatting import bulleted_list

from .domain import GENERIC, Domain
from .graph import maybe_specialize
from .hooks import DelegatingHooks
from .term import AssetExists, InputDates, LoadableTerm


[docs]class PipelineEngine(ABC):
[docs] @abstractmethod def run_pipeline(self, pipeline, start_date, end_date, hooks=None): """Compute values for ``pipeline`` from ``start_date`` to ``end_date``. Parameters ---------- pipeline : zipline.pipeline.Pipeline The pipeline to run. start_date : pd.Timestamp Start date of the computed matrix. end_date : pd.Timestamp End date of the computed matrix. hooks : list[implements(PipelineHooks)], optional Hooks for instrumenting Pipeline execution. Returns ------- result : pd.DataFrame A frame of computed results. The ``result`` columns correspond to the entries of `pipeline.columns`, which should be a dictionary mapping strings to instances of :class:`zipline.pipeline.Term`. For each date between ``start_date`` and ``end_date``, ``result`` will contain a row for each asset that passed `pipeline.screen`. A screen of ``None`` indicates that a row should be returned for each asset that existed each day. """ raise NotImplementedError("run_pipeline")
[docs] @abstractmethod def run_chunked_pipeline( self, pipeline, start_date, end_date, chunksize, hooks=None ): """Compute values for ``pipeline`` from ``start_date`` to ``end_date``, in date chunks of size ``chunksize``. Chunked execution reduces memory consumption, and may reduce computation time depending on the contents of your pipeline. Parameters ---------- pipeline : Pipeline The pipeline to run. start_date : pd.Timestamp The start date to run the pipeline for. end_date : pd.Timestamp The end date to run the pipeline for. chunksize : int The number of days to execute at a time. hooks : list[implements(PipelineHooks)], optional Hooks for instrumenting Pipeline execution. Returns ------- result : pd.DataFrame A frame of computed results. The ``result`` columns correspond to the entries of `pipeline.columns`, which should be a dictionary mapping strings to instances of :class:`zipline.pipeline.Term`. For each date between ``start_date`` and ``end_date``, ``result`` will contain a row for each asset that passed `pipeline.screen`. A screen of ``None`` indicates that a row should be returned for each asset that existed each day. See Also -------- :meth:`zipline.pipeline.engine.PipelineEngine.run_pipeline` """ raise NotImplementedError("run_chunked_pipeline")
class NoEngineRegistered(Exception): """Raised if a user tries to call pipeline_output in an algorithm that hasn't set up a pipeline engine. """ class ExplodingPipelineEngine(PipelineEngine): """A PipelineEngine that doesn't do anything.""" def run_pipeline(self, pipeline, start_date, end_date, hooks=None): raise NoEngineRegistered( "Attempted to run a pipeline but no pipeline " "resources were registered." ) def run_chunked_pipeline( self, pipeline, start_date, end_date, chunksize, hooks=None ): raise NoEngineRegistered( "Attempted to run a chunked pipeline but no pipeline " "resources were registered." )
[docs]def default_populate_initial_workspace( initial_workspace, root_mask_term, execution_plan, dates, assets ): """The default implementation for ``populate_initial_workspace``. This function returns the ``initial_workspace`` argument without making any modifications. Parameters ---------- initial_workspace : dict[array-like] The initial workspace before we have populated it with any cached terms. root_mask_term : Term The root mask term, normally ``AssetExists()``. This is needed to compute the dates for individual terms. execution_plan : ExecutionPlan The execution plan for the pipeline being run. dates : pd.DatetimeIndex All of the dates being requested in this pipeline run including the extra dates for look back windows. assets : pd.Int64Index All of the assets that exist for the window being computed. Returns ------- populated_initial_workspace : dict[term, array-like] The workspace to begin computations with. """ return initial_workspace
[docs]class SimplePipelineEngine(PipelineEngine): """PipelineEngine class that computes each term independently. Parameters ---------- get_loader : callable A function that is given a loadable term and returns a PipelineLoader to use to retrieve raw data for that term. asset_finder : zipline.assets.AssetFinder An AssetFinder instance. We depend on the AssetFinder to determine which assets are in the top-level universe at any point in time. populate_initial_workspace : callable, optional A function which will be used to populate the initial workspace when computing a pipeline. See :func:`zipline.pipeline.engine.default_populate_initial_workspace` for more info. default_hooks : list, optional List of hooks that should be used to instrument all pipelines executed by this engine. See Also -------- :func:`zipline.pipeline.engine.default_populate_initial_workspace` """ __slots__ = ( "_get_loader", "_finder", "_root_mask_term", "_root_mask_dates_term", "_populate_initial_workspace", )
[docs] @expect_types( default_domain=Domain, __funcname="SimplePipelineEngine", ) def __init__( self, get_loader, asset_finder, default_domain=GENERIC, populate_initial_workspace=None, default_hooks=None, ): self._get_loader = get_loader self._finder = asset_finder self._root_mask_term = AssetExists() self._root_mask_dates_term = InputDates() self._populate_initial_workspace = ( populate_initial_workspace or default_populate_initial_workspace ) self._default_domain = default_domain if default_hooks is None: self._default_hooks = [] else: self._default_hooks = list(default_hooks)
[docs] def run_chunked_pipeline( self, pipeline, start_date, end_date, chunksize, hooks=None ): """Compute values for ``pipeline`` from ``start_date`` to ``end_date``, in date chunks of size ``chunksize``. Chunked execution reduces memory consumption, and may reduce computation time depending on the contents of your pipeline. Parameters ---------- pipeline : Pipeline The pipeline to run. start_date : pd.Timestamp The start date to run the pipeline for. end_date : pd.Timestamp The end date to run the pipeline for. chunksize : int The number of days to execute at a time. hooks : list[implements(PipelineHooks)], optional Hooks for instrumenting Pipeline execution. Returns ------- result : pd.DataFrame A frame of computed results. The ``result`` columns correspond to the entries of `pipeline.columns`, which should be a dictionary mapping strings to instances of :class:`zipline.pipeline.Term`. For each date between ``start_date`` and ``end_date``, ``result`` will contain a row for each asset that passed `pipeline.screen`. A screen of ``None`` indicates that a row should be returned for each asset that existed each day. See Also -------- :meth:`zipline.pipeline.engine.PipelineEngine.run_pipeline` """ domain = self.resolve_domain(pipeline) ranges = compute_date_range_chunks( domain.sessions(), start_date, end_date, chunksize, ) hooks = self._resolve_hooks(hooks) run_pipeline = partial(self._run_pipeline_impl, pipeline, hooks=hooks) with hooks.running_pipeline(pipeline, start_date, end_date): chunks = [run_pipeline(s, e) for s, e in ranges] if len(chunks) == 1: # OPTIMIZATION: Don't make an extra copy in `categorical_df_concat` # if we don't have to. return chunks[0] # Filter out empty chunks. Empty dataframes lose dtype information, # which makes concatenation fail. nonempty_chunks = [c for c in chunks if len(c)] return categorical_df_concat(nonempty_chunks, inplace=True)
[docs] def run_pipeline(self, pipeline, start_date, end_date, hooks=None): """Compute values for ``pipeline`` from ``start_date`` to ``end_date``. Parameters ---------- pipeline : zipline.pipeline.Pipeline The pipeline to run. start_date : pd.Timestamp Start date of the computed matrix. end_date : pd.Timestamp End date of the computed matrix. hooks : list[implements(PipelineHooks)], optional Hooks for instrumenting Pipeline execution. Returns ------- result : pd.DataFrame A frame of computed results. The ``result`` columns correspond to the entries of `pipeline.columns`, which should be a dictionary mapping strings to instances of :class:`zipline.pipeline.Term`. For each date between ``start_date`` and ``end_date``, ``result`` will contain a row for each asset that passed `pipeline.screen`. A screen of ``None`` indicates that a row should be returned for each asset that existed each day. """ hooks = self._resolve_hooks(hooks) with hooks.running_pipeline(pipeline, start_date, end_date): return self._run_pipeline_impl( pipeline, start_date, end_date, hooks, )
def _run_pipeline_impl(self, pipeline, start_date, end_date, hooks): """Shared core for ``run_pipeline`` and ``run_chunked_pipeline``.""" # See notes at the top of this module for a description of the # algorithm implemented here. if end_date < start_date: raise ValueError( "start_date must be before or equal to end_date \n" f"start_date={start_date}, end_date={end_date}" ) domain = self.resolve_domain(pipeline) plan = pipeline.to_execution_plan( domain, self._root_mask_term, start_date, end_date, ) extra_rows = plan.extra_rows[self._root_mask_term] root_mask = self._compute_root_mask( domain, start_date, end_date, extra_rows, ) dates, sids, root_mask_values = explode(root_mask) workspace = self._populate_initial_workspace( { self._root_mask_term: root_mask_values, self._root_mask_dates_term: as_column(dates.values), }, self._root_mask_term, plan, dates, sids, ) refcounts = plan.initial_refcounts(workspace) execution_order = plan.execution_order(workspace, refcounts) with hooks.computing_chunk(execution_order, start_date, end_date): results = self.compute_chunk( graph=plan, dates=dates, sids=sids, workspace=workspace, refcounts=refcounts, execution_order=execution_order, hooks=hooks, ) return self._to_narrow( plan.outputs, results, results.pop(plan.screen_name), dates[extra_rows:], sids, ) def _compute_root_mask(self, domain, start_date, end_date, extra_rows): """Compute a lifetimes matrix from our AssetFinder, then drop columns that didn't exist at all during the query dates. Parameters ---------- domain : zipline.pipeline.domain.Domain Domain for which we're computing a pipeline. start_date : pd.Timestamp Base start date for the matrix. end_date : pd.Timestamp End date for the matrix. extra_rows : int Number of extra rows to compute before `start_date`. Extra rows are needed by terms like moving averages that require a trailing window of data. Returns ------- lifetimes : pd.DataFrame Frame of dtype `bool` containing dates from `extra_rows` days before `start_date`, continuing through to `end_date`. The returned frame contains as columns all assets in our AssetFinder that existed for at least one day between `start_date` and `end_date`. """ sessions = domain.sessions() if start_date not in sessions: raise ValueError( f"Pipeline start date ({start_date}) is not a trading session for " f"domain {domain}." ) elif end_date not in sessions: raise ValueError( f"Pipeline end date {end_date} is not a trading session for " f"domain {domain}." ) start_idx, end_idx = sessions.slice_locs(start_date, end_date) if start_idx < extra_rows: raise NoFurtherDataError.from_lookback_window( initial_message="Insufficient data to compute Pipeline:", first_date=sessions[0], lookback_start=start_date, lookback_length=extra_rows, ) # NOTE: This logic should probably be delegated to the domain once we # start adding more complex domains. # # Build lifetimes matrix reaching back to `extra_rows` days before # `start_date.` finder = self._finder lifetimes = finder.lifetimes( sessions[start_idx - extra_rows : end_idx], include_start_date=False, country_codes=(domain.country_code,), ) if not lifetimes.columns.unique: columns = lifetimes.columns duplicated = columns[columns.duplicated()].unique() raise AssertionError("Duplicated sids: %d" % duplicated) # Filter out columns that didn't exist from the farthest look back # window through the end of the requested dates. existed = lifetimes.any() ret = lifetimes.loc[:, existed] num_assets = ret.shape[1] if num_assets == 0: raise ValueError( "Failed to find any assets with country_code {!r} that traded " "between {} and {}.\n" "This probably means that your asset db is old or that it has " "incorrect country/exchange metadata.".format( domain.country_code, start_date, end_date, ) ) return ret @staticmethod def _inputs_for_term(term, workspace, graph, domain, refcounts): """ Compute inputs for the given term. This is mostly complicated by the fact that for each input we store as many rows as will be necessary to serve **any** computation requiring that input. """ offsets = graph.offset out = [] # We need to specialize here because we don't change ComputableTerm # after resolving domains, so they can still contain generic terms as # inputs. specialized = [maybe_specialize(t, domain) for t in term.inputs] if term.windowed: # If term is windowed, then all input data should be instances of # AdjustedArray. for input_ in specialized: adjusted_array = ensure_adjusted_array( workspace[input_], input_.missing_value, ) out.append( adjusted_array.traverse( window_length=term.window_length, offset=offsets[term, input_], # If the refcount for the input is > 1, we will need # to traverse this array again so we must copy. # If the refcount for the input == 0, this is the last # traversal that will happen so we can invalidate # the AdjustedArray and mutate the data in place. copy=refcounts[input_] > 1, ) ) else: # If term is not windowed, input_data may be an AdjustedArray or # np.ndarray. Coerce the former to the latter. for input_ in specialized: input_data = ensure_ndarray(workspace[input_]) offset = offsets[term, input_] input_data = input_data[offset:] if refcounts[input_] > 1: input_data = input_data.copy() out.append(input_data) return out def compute_chunk( self, graph, dates, sids, workspace, refcounts, execution_order, hooks ): """Compute the Pipeline terms in the graph for the requested start and end dates. This is where we do the actual work of running a pipeline. Parameters ---------- graph : zipline.pipeline.graph.ExecutionPlan Dependency graph of the terms to be executed. dates : pd.DatetimeIndex Row labels for our root mask. sids : pd.Int64Index Column labels for our root mask. workspace : dict Map from term -> output. Must contain at least entry for `self._root_mask_term` whose shape is `(len(dates), len(assets))`, but may contain additional pre-computed terms for testing or optimization purposes. refcounts : dict[Term, int] Dictionary mapping terms to number of dependent terms. When a term's refcount hits 0, it can be safely discarded from ``workspace``. See TermGraph.decref_dependencies for more info. execution_order : list[Term] Order in which to execute terms. hooks : implements(PipelineHooks) Hooks to instrument pipeline execution. Returns ------- results : dict Dictionary mapping requested results to outputs. """ self._validate_compute_chunk_params(graph, dates, sids, workspace) get_loader = self._get_loader # Copy the supplied initial workspace so we don't mutate it in place. workspace = workspace.copy() domain = graph.domain # Many loaders can fetch data more efficiently if we ask them to # retrieve all their inputs at once. For example, a loader backed by a # SQL database can fetch multiple columns from the database in a single # query. # # To enable these loaders to fetch their data efficiently, we group # together requests for LoadableTerms if they are provided by the same # loader and they require the same number of extra rows. # # The extra rows condition is a simplification: we don't currently have # a mechanism for asking a loader to fetch different windows of data # for different terms, so we only batch requests together when they're # going to produce data for the same set of dates. def loader_group_key(term): loader = get_loader(term) extra_rows = graph.extra_rows[term] return loader, extra_rows # Only produce loader groups for the terms we expect to load. This # ensures that we can run pipelines for graphs where we don't have a # loader registered for an atomic term if all the dependencies of that # term were supplied in the initial workspace. will_be_loaded = graph.loadable_terms - workspace.keys() loader_groups = groupby( loader_group_key, (t for t in execution_order if t in will_be_loaded), ) for term in execution_order: # `term` may have been supplied in `initial_workspace`, or we may # have loaded `term` as part of a batch with another term coming # from the same loader (see note on loader_group_key above). In # either case, we already have the term computed, so don't # recompute. if term in workspace: continue # Asset labels are always the same, but date labels vary by how # many extra rows are needed. mask, mask_dates = graph.mask_and_dates_for_term( term, self._root_mask_term, workspace, dates, ) if isinstance(term, LoadableTerm): loader = get_loader(term) to_load = sorted( loader_groups[loader_group_key(term)], key=lambda t: t.dataset ) self._ensure_can_load(loader, to_load) with hooks.loading_terms(to_load): loaded = loader.load_adjusted_array( domain, to_load, mask_dates, sids, mask, ) assert set(loaded) == set(to_load), ( "loader did not return an AdjustedArray for each column\n" "expected: %r\n" "got: %r" % ( sorted(to_load, key=repr), sorted(loaded, key=repr), ) ) workspace.update(loaded) else: with hooks.computing_term(term): workspace[term] = term._compute( self._inputs_for_term( term, workspace, graph, domain, refcounts, ), mask_dates, sids, mask, ) if term.ndim == 2: assert workspace[term].shape == mask.shape else: assert workspace[term].shape == (mask.shape[0], 1) # Decref dependencies of ``term``, and clear any terms # whose refcounts hit 0. for garbage in graph.decref_dependencies(term, refcounts): del workspace[garbage] # At this point, all the output terms are in the workspace. out = {} graph_extra_rows = graph.extra_rows for name, term in graph.outputs.items(): # Truncate off extra rows from outputs. out[name] = workspace[term][graph_extra_rows[term] :] return out def _to_narrow(self, terms, data, mask, dates, assets): """ Convert raw computed pipeline results into a DataFrame for public APIs. Parameters ---------- terms : dict[str -> Term] Dict mapping column names to terms. data : dict[str -> ndarray[ndim=2]] Dict mapping column names to computed results for those names. mask : ndarray[bool, ndim=2] Mask array of values to keep. dates : ndarray[datetime64, ndim=1] Row index for arrays `data` and `mask` assets : ndarray[int64, ndim=2] Column index for arrays `data` and `mask` Returns ------- results : pd.DataFrame The indices of `results` are as follows: index : two-tiered MultiIndex of (date, asset). Contains an entry for each (date, asset) pair corresponding to a `True` value in `mask`. columns : Index of str One column per entry in `data`. If mask[date, asset] is True, then result.loc[(date, asset), colname] will contain the value of data[colname][date, asset]. """ if not mask.any(): # Manually handle the empty DataFrame case. This is a workaround # to pandas failing to tz_localize an empty dataframe with a # MultiIndex. It also saves us the work of applying a known-empty # mask to each array. # # Slicing `dates` here to preserve pandas metadata. empty_dates = dates[:0] empty_assets = array([], dtype=object) return pd.DataFrame( data={name: array([], dtype=arr.dtype) for name, arr in data.items()}, index=pd.MultiIndex.from_arrays([empty_dates, empty_assets]), ) # if "open_instance" in data.keys(): # data["open_instance"].tofile("../../open_instance.dat") final_columns = {} for name in data: # Each term that computed an output has its postprocess method # called on the filtered result. # # Using this to convert np.records to tuples final_columns[name] = terms[name].postprocess(data[name][mask]) resolved_assets = array(self._finder.retrieve_all(assets)) index = _pipeline_output_index(dates, resolved_assets, mask) return pd.DataFrame( data=final_columns, index=index, columns=final_columns.keys() ) def _validate_compute_chunk_params(self, graph, dates, sids, initial_workspace): """ Verify that the values passed to compute_chunk are well-formed. """ root = self._root_mask_term clsname = type(self).__name__ # Writing this out explicitly so this errors in testing if we change # the name without updating this line. compute_chunk_name = self.compute_chunk.__name__ if root not in initial_workspace: raise AssertionError( "root_mask values not supplied to {cls}.{method}".format( cls=clsname, method=compute_chunk_name, ) ) shape = initial_workspace[root].shape implied_shape = len(dates), len(sids) if shape != implied_shape: raise AssertionError( "root_mask shape is {shape}, but received dates/assets " "imply that shape should be {implied}".format( shape=shape, implied=implied_shape, ) ) for term in initial_workspace: if self._is_special_root_term(term): continue if term.domain is GENERIC: # XXX: We really shouldn't allow **any** generic terms to be # populated in the initial workspace. A generic term, by # definition, can't correspond to concrete data until it's # paired with a domain, and populate_initial_workspace isn't # given the domain of execution, so it can't possibly know what # data to use when populating a generic term. # # In our current implementation, however, we don't have a good # way to represent specializations of ComputableTerms that take # only generic inputs, so there's no good way for the initial # workspace to provide data for such terms except by populating # the generic ComputableTerm. # # The right fix for the above is to implement "full # specialization", i.e., implementing ``specialize`` uniformly # across all terms, not just LoadableTerms. Having full # specialization will also remove the need for all of the # remaining ``maybe_specialize`` calls floating around in this # file. # # In the meantime, disallowing ComputableTerms in the initial # workspace would break almost every test in # `test_filter`/`test_factor`/`test_classifier`, and fixing # them would require updating all those tests to compute with # more specialized terms. Once we have full specialization, we # can fix all the tests without a large volume of edits by # simply specializing their workspaces, so for now I'm leaving # this in place as a somewhat sharp edge. if isinstance(term, LoadableTerm): raise ValueError( "Loadable workspace terms must be specialized to a " "domain, but got generic term {}".format(term) ) elif term.domain != graph.domain: raise ValueError( "Initial workspace term {} has domain {}. " "Does not match pipeline domain {}".format( term, term.domain, graph.domain, ) ) def resolve_domain(self, pipeline): """Resolve a concrete domain for ``pipeline``.""" domain = pipeline.domain(default=self._default_domain) if domain is GENERIC: raise ValueError( "Unable to determine domain for Pipeline.\n" "Pass domain=<desired domain> to your Pipeline to set a " "domain." ) return domain def _is_special_root_term(self, term): return term is self._root_mask_term or term is self._root_mask_dates_term def _resolve_hooks(self, hooks): if hooks is None: hooks = [] return DelegatingHooks(self._default_hooks + hooks) def _ensure_can_load(self, loader, terms): """Ensure that ``loader`` can load ``terms``.""" if not loader.currency_aware: bad = [t for t in terms if t.currency_conversion is not None] if bad: raise ValueError( "Requested currency conversion is not supported for the " "following terms:\n{}".format(bulleted_list(bad)) )
def _pipeline_output_index(dates, assets, mask): """ Create a MultiIndex for a pipeline output. Parameters ---------- dates : pd.DatetimeIndex Row labels for ``mask``. assets : pd.Index Column labels for ``mask``. mask : np.ndarray[bool] Mask array indicating date/asset pairs that should be included in output index. Returns ------- index : pd.MultiIndex MultiIndex containing (date, asset) pairs corresponding to ``True`` values in ``mask``. """ date_labels = repeat_last_axis(arange(len(dates)), len(assets))[mask] asset_labels = repeat_first_axis(arange(len(assets)), len(dates))[mask] return pd.MultiIndex( [dates, assets], [date_labels, asset_labels], # TODO: We should probably add names for these. names=[None, None], verify_integrity=False, )