Source code for zipline.pipeline.loaders.frame

"""
PipelineLoader accepting a DataFrame as input.
"""
from functools import partial

from interface import implements
import numpy as np
import pandas as pd

from zipline.lib.adjusted_array import AdjustedArray
from zipline.lib.adjustment import make_adjustment_from_labels
from zipline.utils.numpy_utils import as_column
from .base import PipelineLoader

ADJUSTMENT_COLUMNS = pd.Index(
    [
        "sid",
        "value",
        "kind",
        "start_date",
        "end_date",
        "apply_date",
    ]
)


[docs]class DataFrameLoader(implements(PipelineLoader)): """A PipelineLoader that reads its input from DataFrames. Mostly useful for testing, but can also be used for real work if your data fits in memory. Parameters ---------- column : zipline.pipeline.data.BoundColumn The column whose data is loadable by this loader. baseline : pandas.DataFrame A DataFrame with index of type DatetimeIndex and columns of type Int64Index. Dates should be labelled with the first date on which a value would be **available** to an algorithm. This means that OHLCV data should generally be shifted back by a trading day before being supplied to this class. adjustments : pandas.DataFrame, default=None A DataFrame with the following columns: sid : int value : any kind : int (zipline.pipeline.loaders.frame.ADJUSTMENT_TYPES) start_date : datetime64 (can be NaT) end_date : datetime64 (must be set) apply_date : datetime64 (must be set) The default of None is interpreted as "no adjustments to the baseline". """
[docs] def __init__(self, column, baseline, adjustments=None): self.column = column self.baseline = baseline.values.astype(self.column.dtype) self.dates = baseline.index self.assets = baseline.columns if adjustments is None: adjustments = pd.DataFrame( index=pd.DatetimeIndex([]), columns=ADJUSTMENT_COLUMNS, ) else: # Ensure that columns are in the correct order. adjustments = adjustments.reindex(ADJUSTMENT_COLUMNS, axis=1) adjustments.sort_values(["apply_date", "sid"], inplace=True) self.adjustments = adjustments self.adjustment_apply_dates = pd.DatetimeIndex(adjustments.apply_date) self.adjustment_end_dates = pd.DatetimeIndex(adjustments.end_date) self.adjustment_sids = pd.Index(adjustments.sid, dtype="int64")
[docs] def format_adjustments(self, dates, assets): """Build a dict of Adjustment objects in the format expected by AdjustedArray. Returns a dict of the form: { # Integer index into `dates` for the date on which we should # apply the list of adjustments. 1 : [ Float64Multiply(first_row=2, last_row=4, col=3, value=0.5), Float64Overwrite(first_row=3, last_row=5, col=1, value=2.0), ... ], ... } """ make_adjustment = partial(make_adjustment_from_labels, dates, assets) min_date, max_date = dates[[0, -1]] # TODO: Consider porting this to Cython. if len(self.adjustments) == 0: return {} # Mask for adjustments whose apply_dates are in the requested window of # dates. date_bounds = self.adjustment_apply_dates.slice_indexer( min_date, max_date, ) dates_filter = np.zeros(len(self.adjustments), dtype="bool") dates_filter[date_bounds] = True # Ignore adjustments whose apply_date is in range, but whose end_date # is out of range. dates_filter &= self.adjustment_end_dates >= min_date # Mask for adjustments whose sids are in the requested assets. sids_filter = self.adjustment_sids.isin(assets.values) adjustments_to_use = self.adjustments.loc[dates_filter & sids_filter].set_index( "apply_date" ) # For each apply_date on which we have an adjustment, compute # the integer index of that adjustment's apply_date in `dates`. # Then build a list of Adjustment objects for that apply_date. # This logic relies on the sorting applied on the previous line. out = {} previous_apply_date = object() for row in adjustments_to_use.itertuples(): # This expansion depends on the ordering of the DataFrame columns, # defined above. apply_date, sid, value, kind, start_date, end_date = row if apply_date != previous_apply_date: # Get the next apply date if no exact match. row_loc = dates.get_indexer([apply_date], method="bfill")[0] current_date_adjustments = out[row_loc] = [] previous_apply_date = apply_date # Look up the approprate Adjustment constructor based on the value # of `kind`. current_date_adjustments.append( make_adjustment(start_date, end_date, sid, kind, value) ) return out
[docs] def load_adjusted_array(self, domain, columns, dates, sids, mask): """Load data from our stored baseline.""" if len(columns) != 1: raise ValueError("Can't load multiple columns with DataFrameLoader") column = columns[0] self._validate_input_column(column) date_indexer = self.dates.get_indexer(dates) assets_indexer = self.assets.get_indexer(sids) # Boolean arrays with True on matched entries good_dates = date_indexer != -1 good_assets = assets_indexer != -1 data = self.baseline[np.ix_(date_indexer, assets_indexer)] mask = (good_assets & as_column(good_dates)) & mask # Mask out requested columns/rows that didn't match. data[~mask] = column.missing_value return { column: AdjustedArray( # Pull out requested columns/rows from our baseline data. data=data, adjustments=self.format_adjustments(dates, sids), missing_value=column.missing_value, ), }
def _validate_input_column(self, column): """Make sure a passed column is our column.""" if column != self.column and column.unspecialize() != self.column: raise ValueError(f"Can't load unknown column {column}")