import numpy as np
import pandas as pd
from interface import implements
from toolz import groupby, merge
from .base import PipelineLoader
from zipline.pipeline.common import (
EVENT_DATE_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.loaders.frame import DataFrameLoader
from zipline.pipeline.loaders.utils import (
next_event_indexer,
previous_event_indexer,
)
def required_event_fields(next_value_columns, previous_value_columns):
"""
Compute the set of resource columns required to serve
``next_value_columns`` and ``previous_value_columns``.
"""
# These metadata columns are used to align event indexers.
return {TS_FIELD_NAME, SID_FIELD_NAME, EVENT_DATE_FIELD_NAME,}.union(
# We also expect any of the field names that our loadable columns
# are mapped to.
next_value_columns.values(),
previous_value_columns.values(),
)
def validate_column_specs(events, next_value_columns, previous_value_columns):
"""
Verify that the columns of ``events`` can be used by an EventsLoader to
serve the BoundColumns described by ``next_value_columns`` and
``previous_value_columns``.
"""
required = required_event_fields(next_value_columns, previous_value_columns)
received = set(events.columns)
missing = required - received
if missing:
raise ValueError(
"EventsLoader missing required columns {missing}.\n"
"Got Columns: {received}\n"
"Expected Columns: {required}".format(
missing=sorted(missing),
received=sorted(received),
required=sorted(required),
)
)
[docs]class EventsLoader(implements(PipelineLoader)):
"""
Base class for PipelineLoaders that supports loading the next and previous
value of an event field.
Does not currently support adjustments.
Parameters
----------
events : pd.DataFrame
A DataFrame representing events (e.g. share buybacks or
earnings announcements) associated with particular companies.
``events`` must contain at least three columns::
sid : int64
The asset id associated with each event.
event_date : datetime64[ns]
The date on which the event occurred.
timestamp : datetime64[ns]
The date on which we learned about the event.
next_value_columns : dict[BoundColumn -> str]
Map from dataset columns to raw field names that should be used when
searching for a next event value.
previous_value_columns : dict[BoundColumn -> str]
Map from dataset columns to raw field names that should be used when
searching for a previous event value.
"""
[docs] def __init__(self, events, next_value_columns, previous_value_columns):
validate_column_specs(
events,
next_value_columns,
previous_value_columns,
)
events = events[events[EVENT_DATE_FIELD_NAME].notnull()]
# We always work with entries from ``events`` directly as numpy arrays,
# so we coerce from a frame to a dict of arrays here.
self.events = {
name: np.asarray(series)
for name, series in (events.sort_values(EVENT_DATE_FIELD_NAME).items())
}
# Columns to load with self.load_next_events.
self.next_value_columns = next_value_columns
# Columns to load with self.load_previous_events.
self.previous_value_columns = previous_value_columns
def split_next_and_previous_event_columns(self, requested_columns):
"""
Split requested columns into columns that should load the next known
value and columns that should load the previous known value.
Parameters
----------
requested_columns : iterable[BoundColumn]
Returns
-------
next_cols, previous_cols : iterable[BoundColumn], iterable[BoundColumn]
``requested_columns``, partitioned into sub-sequences based on
whether the column should produce values from the next event or the
previous event
"""
def next_or_previous(c):
if c in self.next_value_columns:
return "next"
elif c in self.previous_value_columns:
return "previous"
raise ValueError(
"{c} not found in next_value_columns "
"or previous_value_columns".format(c=c)
)
groups = groupby(next_or_previous, requested_columns)
return groups.get("next", ()), groups.get("previous", ())
def next_event_indexer(self, dates, data_query_cutoff, sids):
return next_event_indexer(
dates,
data_query_cutoff,
sids,
self.events[EVENT_DATE_FIELD_NAME],
self.events[TS_FIELD_NAME],
self.events[SID_FIELD_NAME],
)
def previous_event_indexer(self, data_query_time, sids):
return previous_event_indexer(
data_query_time,
sids,
self.events[EVENT_DATE_FIELD_NAME],
self.events[TS_FIELD_NAME],
self.events[SID_FIELD_NAME],
)
def load_next_events(self, domain, columns, dates, data_query_time, sids, mask):
if not columns:
return {}
return self._load_events(
name_map=self.next_value_columns,
indexer=self.next_event_indexer(dates, data_query_time, sids),
domain=domain,
columns=columns,
dates=dates,
sids=sids,
mask=mask,
)
def load_previous_events(self, domain, columns, dates, data_query_time, sids, mask):
if not columns:
return {}
return self._load_events(
name_map=self.previous_value_columns,
indexer=self.previous_event_indexer(data_query_time, sids),
domain=domain,
columns=columns,
dates=dates,
sids=sids,
mask=mask,
)
def _load_events(self, name_map, indexer, domain, columns, dates, sids, mask):
def to_frame(array):
return pd.DataFrame(array, index=dates, columns=sids)
assert indexer.shape == (len(dates), len(sids))
out = {}
for c in columns:
# Array holding the value for column `c` for every event we have.
col_array = self.events[name_map[c]]
if not len(col_array):
# We don't have **any** events, so return col.missing_value
# every day for every sid. We have to special case empty events
# because in normal branch we depend on being able to index
# with -1 for missing values, which fails if there are no
# events at all.
raw = np.full(
(len(dates), len(sids)),
c.missing_value,
dtype=c.dtype,
)
else:
# Slot event values into sid/date locations using `indexer`.
# This produces a 2D array of the same shape as `indexer`,
# which must be (len(dates), len(sids))`.
raw = col_array[indexer]
# indexer will be -1 for locations where we don't have a known
# value. Overwrite those locations with c.missing_value.
raw[indexer < 0] = c.missing_value
# Delegate the actual array formatting logic to a DataFrameLoader.
loader = DataFrameLoader(c, to_frame(raw), adjustments=None)
out[c] = loader.load_adjusted_array(
domain,
[c],
dates,
sids,
mask,
)[c]
return out
def load_adjusted_array(self, domain, columns, dates, sids, mask):
data_query = domain.data_query_cutoff_for_sessions(dates)
n, p = self.split_next_and_previous_event_columns(columns)
return merge(
self.load_next_events(domain, n, dates, data_query, sids, mask),
self.load_previous_events(domain, p, dates, data_query, sids, mask),
)