import abc
from collections import namedtuple, OrderedDict
from itertools import repeat
from textwrap import dedent
from weakref import WeakKeyDictionary
from toolz import first
from zipline.currency import Currency
from zipline.data.fx import DEFAULT_FX_RATE
from zipline.pipeline.classifiers import Classifier, Latest as LatestClassifier
from zipline.pipeline.domain import Domain, GENERIC
from zipline.pipeline.factors import Factor, Latest as LatestFactor
from zipline.pipeline.filters import Filter, Latest as LatestFilter
from zipline.pipeline.sentinels import NotSpecified, sentinel
from zipline.pipeline.term import (
AssetExists,
LoadableTerm,
validate_dtype,
)
from zipline.utils.formatting import s, plural
from zipline.utils.input_validation import (
coerce_types,
ensure_dtype,
expect_types,
)
from zipline.utils.numpy_utils import float64_dtype, NoDefaultMissingValue
from zipline.utils.preprocess import preprocess
from zipline.utils.string_formatting import bulleted_list
IsSpecialization = sentinel("IsSpecialization")
[docs]class Column:
"""
An abstract column of data, not yet associated with a dataset.
"""
@preprocess(dtype=ensure_dtype)
def __init__(
self,
dtype,
missing_value=NotSpecified,
doc=None,
metadata=None,
currency_aware=False,
):
if currency_aware and dtype != float64_dtype:
raise ValueError(
"Columns cannot be constructed with currency_aware={}, "
"dtype={}. Currency aware columns must have a float64 dtype.".format(
currency_aware, dtype
)
)
self.dtype = dtype
self.missing_value = missing_value
self.doc = doc
self.metadata = metadata.copy() if metadata is not None else {}
self.currency_aware = currency_aware
[docs] def bind(self, name):
"""
Bind a `Column` object to its name.
"""
return _BoundColumnDescr(
dtype=self.dtype,
missing_value=self.missing_value,
name=name,
doc=self.doc,
metadata=self.metadata,
currency_aware=self.currency_aware,
)
class _BoundColumnDescr:
"""
Intermediate class that sits on `DataSet` objects and returns memoized
`BoundColumn` objects when requested.
This exists so that subclasses of DataSets don't share columns with their
parent classes.
"""
def __init__(self, dtype, missing_value, name, doc, metadata, currency_aware):
# Validating and calculating default missing values here guarantees
# that we fail quickly if the user passes an unsupporte dtype or fails
# to provide a missing value for a dtype that requires one
# (e.g. int64), but still enables us to provide an error message that
# points to the name of the failing column.
try:
self.dtype, self.missing_value = validate_dtype(
termname="Column(name={name!r})".format(name=name),
dtype=dtype,
missing_value=missing_value,
)
except NoDefaultMissingValue as exc:
# Re-raise with a more specific message.
raise NoDefaultMissingValue(
"Failed to create Column with name {name!r} and"
" dtype {dtype} because no missing_value was provided\n\n"
"Columns with dtype {dtype} require a missing_value.\n"
"Please pass missing_value to Column() or use a different"
" dtype.".format(dtype=dtype, name=name)
) from exc
self.name = name
self.doc = doc
self.metadata = metadata
self.currency_aware = currency_aware
def __get__(self, instance, owner):
"""
Produce a concrete BoundColumn object when accessed.
We don't bind to datasets at class creation time so that subclasses of
DataSets produce different BoundColumns.
"""
return BoundColumn(
dtype=self.dtype,
missing_value=self.missing_value,
dataset=owner,
name=self.name,
doc=self.doc,
metadata=self.metadata,
currency_conversion=None,
currency_aware=self.currency_aware,
)
[docs]class BoundColumn(LoadableTerm):
"""
A column of data that's been concretely bound to a particular dataset.
Attributes
----------
dtype : numpy.dtype
The dtype of data produced when this column is loaded.
latest : zipline.pipeline.LoadableTerm
A :class:`~zipline.pipeline.Filter`, :class:`~zipline.pipeline.Factor`,
or :class:`~zipline.pipeline.Classifier` computing the most recently
known value of this column on each date.
See :class:`zipline.pipeline.mixins.LatestMixin` for more details.
dataset : zipline.pipeline.data.DataSet
The dataset to which this column is bound.
name : str
The name of this column.
metadata : dict
Extra metadata associated with this column.
currency_aware : bool
Whether or not this column produces currency-denominated data.
Notes
-----
Instances of this class are dynamically created upon access to attributes
of :class:`~zipline.pipeline.data.DataSet`. For example,
:attr:`~zipline.pipeline.data.EquityPricing.close` is an instance of this
class. Pipeline API users should never construct instances of this
directly.
"""
mask = AssetExists()
window_safe = True
def __new__(
cls,
dtype,
missing_value,
dataset,
name,
doc,
metadata,
currency_conversion,
currency_aware,
):
if currency_aware and dtype != float64_dtype:
raise AssertionError(
"The {} column on dataset {} cannot be constructed with "
"currency_aware={}, dtype={}. Currency aware columns must "
"have a float64 dtype.".format(
name,
dataset,
currency_aware,
dtype,
)
)
return super(BoundColumn, cls).__new__(
cls,
domain=dataset.domain,
dtype=dtype,
missing_value=missing_value,
dataset=dataset,
name=name,
ndim=dataset.ndim,
doc=doc,
metadata=metadata,
currency_conversion=currency_conversion,
currency_aware=currency_aware,
)
def _init(
self,
dataset,
name,
doc,
metadata,
currency_conversion,
currency_aware,
*args,
**kwargs,
):
self._dataset = dataset
self._name = name
self.__doc__ = doc
self._metadata = metadata
self._currency_conversion = currency_conversion
self._currency_aware = currency_aware
return super(BoundColumn, self)._init(*args, **kwargs)
@classmethod
def _static_identity(
cls,
dataset,
name,
doc,
metadata,
currency_conversion,
currency_aware,
*args,
**kwargs,
):
return (
super(BoundColumn, cls)._static_identity(*args, **kwargs),
dataset,
name,
doc,
frozenset(sorted(metadata.items(), key=first)),
currency_conversion,
currency_aware,
)
def __lt__(self, other):
msg = "Can't compare '{}' with '{}'. (Did you mean to use '.latest'?)"
raise TypeError(msg.format(self.qualname, other.__class__.__name__))
__gt__ = __le__ = __ge__ = __lt__
def _replace(self, **kwargs):
kw = dict(
dtype=self.dtype,
missing_value=self.missing_value,
dataset=self._dataset,
name=self._name,
doc=self.__doc__,
metadata=self._metadata,
currency_conversion=self._currency_conversion,
currency_aware=self._currency_aware,
)
kw.update(kwargs)
return type(self)(**kw)
[docs] def specialize(self, domain):
"""Specialize ``self`` to a concrete domain."""
if domain == self.domain:
return self
return self._replace(dataset=self._dataset.specialize(domain))
[docs] def unspecialize(self):
"""
Unspecialize a column to its generic form.
This is equivalent to ``column.specialize(GENERIC)``.
"""
return self.specialize(GENERIC)
[docs] @coerce_types(currency=(str, Currency))
def fx(self, currency):
"""
Construct a currency-converted version of this column.
Parameters
----------
currency : str or zipline.currency.Currency
Currency into which to convert this column's data.
Returns
-------
column : BoundColumn
Column producing the same data as ``self``, but currency-converted
into ``currency``.
"""
conversion = self._currency_conversion
if not self._currency_aware:
raise TypeError(
"The .fx() method cannot be called on {} because it does not "
"produce currency-denominated data.".format(self.qualname)
)
elif conversion is not None and conversion.currency == currency:
return self
return self._replace(
currency_conversion=CurrencyConversion(
currency=currency,
field=DEFAULT_FX_RATE,
)
)
@property
def currency_conversion(self):
"""Specification for currency conversions applied for this term."""
return self._currency_conversion
@property
def currency_aware(self):
"""
Whether or not this column produces currency-denominated data.
"""
return self._currency_aware
@property
def dataset(self):
"""
The dataset to which this column is bound.
"""
return self._dataset
@property
def name(self):
"""
The name of this column.
"""
return self._name
@property
def metadata(self):
"""
A copy of the metadata for this column.
"""
return self._metadata.copy()
@property
def qualname(self):
"""The fully-qualified name of this column."""
out = ".".join([self.dataset.qualname, self.name])
conversion = self._currency_conversion
if conversion is not None:
out += ".fx({!r})".format(conversion.currency.code)
return out
@property
def latest(self):
dtype = self.dtype
if dtype in Filter.ALLOWED_DTYPES:
Latest = LatestFilter
elif dtype in Classifier.ALLOWED_DTYPES:
Latest = LatestClassifier
else:
assert dtype in Factor.ALLOWED_DTYPES, "Unknown dtype %s." % dtype
Latest = LatestFactor
return Latest(
inputs=(self,),
dtype=dtype,
missing_value=self.missing_value,
ndim=self.ndim,
)
def __repr__(self):
return "{qualname}::{dtype}".format(
qualname=self.qualname,
dtype=self.dtype.name,
)
[docs] def graph_repr(self):
"""Short repr to use when rendering Pipeline graphs."""
# Graphviz interprets `\l` as "divide label into lines, left-justified"
return "BoundColumn:\\l Dataset: {}\\l Column: {}\\l".format(
self.dataset.__name__, self.name
)
[docs] def recursive_repr(self):
"""Short repr used to render in recursive contexts."""
return self.qualname
class DataSetMeta(type):
"""
Metaclass for DataSets
Supplies name and dataset information to Column attributes, and manages
families of specialized dataset.
"""
def __new__(metacls, name, bases, dict_):
if len(bases) != 1:
# Disallowing multiple inheritance makes it easier for us to
# determine whether a given dataset is the root for its family of
# specializations.
raise TypeError("Multiple dataset inheritance is not supported.")
# This marker is set in the class dictionary by `specialize` below.
is_specialization = dict_.pop(IsSpecialization, False)
newtype = super(DataSetMeta, metacls).__new__(metacls, name, bases, dict_)
if not isinstance(newtype.domain, Domain):
raise TypeError(
"Expected a Domain for {}.domain, but got {} instead.".format(
newtype.__name__,
type(newtype.domain),
)
)
# Collect all of the column names that we inherit from our parents.
column_names = set().union(
*(getattr(base, "_column_names", ()) for base in bases)
)
# Collect any new columns from this dataset.
for maybe_colname, maybe_column in dict_.items():
if isinstance(maybe_column, Column):
# add column names defined on our class
bound_column_descr = maybe_column.bind(maybe_colname)
setattr(newtype, maybe_colname, bound_column_descr)
column_names.add(maybe_colname)
newtype._column_names = frozenset(column_names)
if not is_specialization:
# This is the new root of a family of specializations. Store the
# memoized dictionary for family on this type.
newtype._domain_specializations = WeakKeyDictionary(
{
newtype.domain: newtype,
}
)
return newtype
@expect_types(domain=Domain)
def specialize(cls, domain):
"""
Specialize a generic DataSet to a concrete domain.
Parameters
----------
domain : zipline.pipeline.domain.Domain
Domain to which we should generate a specialization.
Returns
-------
specialized : type
A new :class:`~zipline.pipeline.data.DataSet` subclass with the
same columns as ``self``, but specialized to ``domain``.
"""
# We're already the specialization to this domain, so just return self.
if domain == cls.domain:
return cls
try:
return cls._domain_specializations[domain]
except KeyError as exc:
if not cls._can_create_new_specialization(domain):
# This either means we're already a specialization and trying
# to create a new specialization, or we're the generic version
# of a root-specialized dataset, which we don't want to create
# new specializations of.
raise ValueError(
"Can't specialize {dataset} from {current} to new domain {new}.".format(
dataset=cls.__name__,
current=cls.domain,
new=domain,
)
) from exc
new_type = cls._create_specialization(domain)
cls._domain_specializations[domain] = new_type
return new_type
def unspecialize(cls):
"""
Unspecialize a dataset to its generic form.
This is equivalent to ``dataset.specialize(GENERIC)``.
"""
return cls.specialize(GENERIC)
def _can_create_new_specialization(cls, domain):
# Always allow specializing to a generic domain.
if domain is GENERIC:
return True
elif "_domain_specializations" in vars(cls):
# This branch is True if we're the root of a family.
# Allow specialization if we're generic.
return cls.domain is GENERIC
else:
# If we're not the root of a family, we can't create any new
# specializations.
return False
def _create_specialization(cls, domain):
# These are all assertions because we should have handled these cases
# already in specialize().
assert isinstance(domain, Domain)
assert (
domain not in cls._domain_specializations
), "Domain specializations should be memoized!"
if domain is not GENERIC:
assert (
cls.domain is GENERIC
), "Can't specialize dataset with domain {} to domain {}.".format(
cls.domain,
domain,
)
# Create a new subclass of ``self`` with the given domain.
# Mark that it's a specialization so that we know not to create a new
# family for it.
name = cls.__name__
bases = (cls,)
dict_ = {"domain": domain, IsSpecialization: True}
out = type(name, bases, dict_)
out.__module__ = cls.__module__
return out
@property
def columns(cls):
return frozenset(getattr(cls, colname) for colname in cls._column_names)
@property
def qualname(cls):
if cls.domain is GENERIC:
specialization_key = ""
else:
specialization_key = "<" + cls.domain.country_code + ">"
return cls.__name__ + specialization_key
# NOTE: We used to use `functools.total_ordering` to account for all of the
# other rich comparison methods, but it has issues in python 3 and
# this method is only used for test purposes, so for now we will just
# keep this in isolation. If we ever need any of the other comparison
# methods we will have to implement them individually.
def __lt__(cls, other):
return id(cls) < id(other)
def __repr__(cls):
return "<DataSet: %r, domain=%s>" % (cls.__name__, cls.domain)
[docs]class DataSet(object, metaclass=DataSetMeta):
"""
Base class for Pipeline datasets.
A :class:`DataSet` is defined by two parts:
1. A collection of :class:`~zipline.pipeline.data.Column` objects that
describe the queryable attributes of the dataset.
2. A :class:`~zipline.pipeline.domain.Domain` describing the assets and
calendar of the data represented by the :class:`DataSet`.
To create a new Pipeline dataset, define a subclass of :class:`DataSet` and
set one or more :class:`Column` objects as class-level attributes. Each
column requires a ``np.dtype`` that describes the type of data that should
be produced by a loader for the dataset. Integer columns must also provide
a "missing value" to be used when no value is available for a given
asset/date combination.
By default, the domain of a dataset is the special singleton value,
:data:`~zipline.pipeline.domain.GENERIC`, which means that they can be used
in a Pipeline running on **any** domain.
In some cases, it may be preferable to restrict a dataset to only allow
support a single domain. For example, a DataSet may describe data from a
vendor that only covers the US. To restrict a dataset to a specific domain,
define a `domain` attribute at class scope.
You can also define a domain-specific version of a generic DataSet by
calling its ``specialize`` method with the domain of interest.
Examples
--------
The built-in EquityPricing dataset is defined as follows::
class EquityPricing(DataSet):
open = Column(float)
high = Column(float)
low = Column(float)
close = Column(float)
volume = Column(float)
The built-in USEquityPricing dataset is a specialization of
EquityPricing. It is defined as::
from zipline.pipeline.domain import US_EQUITIES
USEquityPricing = EquityPricing.specialize(US_EQUITIES)
Columns can have types other than float. A dataset containing assorted
company metadata might be defined like this::
class CompanyMetadata(DataSet):
# Use float for semantically-numeric data, even if it's always
# integral valued (see Notes section below). The default missing
# value for floats is NaN.
shares_outstanding = Column(float)
# Use object for string columns. The default missing value for
# object-dtype columns is None.
ticker = Column(object)
# Use integers for integer-valued categorical data like sector or
# industry codes. Integer-dtype columns require an explicit missing
# value.
sector_code = Column(int, missing_value=-1)
# Use bool for boolean-valued flags. Note that the default missing
# value for bool-dtype columns is False.
is_primary_share = Column(bool)
Notes
-----
Because numpy has no native support for integers with missing values, users
are strongly encouraged to use floats for any data that's semantically
numeric. Doing so enables the use of `NaN` as a natural missing value,
which has useful propagation semantics.
"""
domain = GENERIC
ndim = 2
[docs] @classmethod
def get_column(cls, name):
"""Look up a column by name.
Parameters
----------
name : str
Name of the column to look up.
Returns
-------
column : zipline.pipeline.data.BoundColumn
Column with the given name.
Raises
------
AttributeError
If no column with the given name exists.
"""
clsdict = vars(cls)
try:
maybe_column = clsdict[name]
if not isinstance(maybe_column, _BoundColumnDescr):
raise KeyError(name)
except KeyError as exc:
raise AttributeError(
"{dset} has no column {colname!r}:\n\n"
"Possible choices are:\n"
"{choices}".format(
dset=cls.qualname,
colname=name,
choices=bulleted_list(
sorted(cls._column_names),
max_count=10,
),
)
) from exc
# Resolve column descriptor into a BoundColumn.
return maybe_column.__get__(None, cls)
# This attribute is set by DataSetMeta to mark that a class is the root of a
# family of datasets with diffent domains. We don't want that behavior for the
# base DataSet class, and we also don't want to accidentally use a shared
# version of this attribute if we fail to set this in a subclass somewhere.
del DataSet._domain_specializations
class DataSetFamilyLookupError(AttributeError):
"""Exception thrown when a column is accessed on a DataSetFamily
instead of on the result of a slice.
Parameters
----------
family_name : str
The name of the DataSetFamily on which the access occurred.
column_name : str
The name of the column accessed.
"""
def __init__(self, family_name, column_name):
self.family_name = family_name
self.column_name = column_name
def __str__(self):
# NOTE: when ``aggregate`` is added, remember to update this message
return dedent(
"""\
Attempted to access column {c} from DataSetFamily {d}:
To work with dataset families, you must first select a
slice using the ``slice`` method:
{d}.slice(...).{c}
""".format(
c=self.column_name, d=self.family_name
)
)
class _DataSetFamilyColumn:
"""Descriptor used to raise a helpful error when a column is accessed on a
DataSetFamily instead of on the result of a slice.
Parameters
----------
column_names : str
The name of the column.
"""
def __init__(self, column_name):
self.column_name = column_name
def __get__(self, instance, owner):
raise DataSetFamilyLookupError(
owner.__name__,
self.column_name,
)
class DataSetFamilyMeta(abc.ABCMeta):
def __new__(cls, name, bases, dict_):
columns = {}
for k, v in dict_.items():
if isinstance(v, Column):
# capture all the columns off the DataSetFamily class
# and replace them with a descriptor that will raise a helpful
# error message. The columns will get added to the BaseSlice
# for this type.
columns[k] = v
dict_[k] = _DataSetFamilyColumn(k)
is_abstract = dict_.pop("_abstract", False)
self = super(DataSetFamilyMeta, cls).__new__(
cls,
name,
bases,
dict_,
)
if not is_abstract:
self.extra_dims = extra_dims = OrderedDict(
[(k, frozenset(v)) for k, v in OrderedDict(self.extra_dims).items()]
)
if not extra_dims:
raise ValueError(
"DataSetFamily must be defined with non-empty"
" extra_dims, or with `_abstract = True`",
)
class BaseSlice(self._SliceType):
dataset_family = self
ndim = self.slice_ndim
domain = self.domain
locals().update(columns)
BaseSlice.__name__ = "%sBaseSlice" % self.__name__
self._SliceType = BaseSlice
# each type gets a unique cache
self._slice_cache = {}
return self
def __repr__(self):
return "<DataSetFamily: %r, extra_dims=%r>" % (
self.__name__,
list(self.extra_dims),
)
class DataSetFamilySlice(DataSet):
"""Marker type for slices of a
:class:`zipline.pipeline.data.dataset.DataSetFamily` objects
"""
# XXX: This docstring was mostly written when the abstraction here was
# "MultiDimensionalDataSet". It probably needs some rewriting.
[docs]class DataSetFamily(metaclass=DataSetFamilyMeta):
"""
Base class for Pipeline dataset families.
Dataset families are used to represent data where the unique identifier for
a row requires more than just asset and date coordinates. A
:class:`DataSetFamily` can also be thought of as a collection of
:class:`~zipline.pipeline.data.DataSet` objects, each of which has the same
columns, domain, and ndim.
:class:`DataSetFamily` objects are defined with one or more
:class:`~zipline.pipeline.data.Column` objects, plus one additional field:
``extra_dims``.
The ``extra_dims`` field defines coordinates other than asset and date that
must be fixed to produce a logical timeseries. The column objects determine
columns that will be shared by slices of the family.
``extra_dims`` are represented as an ordered dictionary where the keys are
the dimension name, and the values are a set of unique values along that
dimension.
To work with a :class:`DataSetFamily` in a pipeline expression, one must
choose a specific value for each of the extra dimensions using the
:meth:`~zipline.pipeline.data.DataSetFamily.slice` method.
For example, given a :class:`DataSetFamily`:
.. code-block:: python
class SomeDataSet(DataSetFamily):
extra_dims = [
('dimension_0', {'a', 'b', 'c'}),
('dimension_1', {'d', 'e', 'f'}),
]
column_0 = Column(float)
column_1 = Column(bool)
This dataset might represent a table with the following columns:
::
sid :: int64
asof_date :: datetime64[ns]
timestamp :: datetime64[ns]
dimension_0 :: str
dimension_1 :: str
column_0 :: float64
column_1 :: bool
Here we see the implicit ``sid``, ``asof_date`` and ``timestamp`` columns
as well as the extra dimensions columns.
This :class:`DataSetFamily` can be converted to a regular :class:`DataSet`
with:
.. code-block:: python
DataSetSlice = SomeDataSet.slice(dimension_0='a', dimension_1='e')
This sliced dataset represents the rows from the higher dimensional dataset
where ``(dimension_0 == 'a') & (dimension_1 == 'e')``.
"""
_abstract = True # Removed by metaclass
domain = GENERIC
slice_ndim = 2
_SliceType = DataSetFamilySlice
@type.__call__
class extra_dims:
"""OrderedDict[str, frozenset] of dimension name -> unique values
May be defined on subclasses as an iterable of pairs: the
metaclass converts this attribute to an OrderedDict.
"""
__isabstractmethod__ = True
def __get__(self, instance, owner):
return []
@classmethod
def _canonical_key(cls, args, kwargs):
extra_dims = cls.extra_dims
dimensions_set = set(extra_dims)
if not set(kwargs) <= dimensions_set:
extra = sorted(set(kwargs) - dimensions_set)
raise TypeError(
"%s does not have the following %s: %s\n"
"Valid dimensions are: %s"
% (
cls.__name__,
s("dimension", extra),
", ".join(extra),
", ".join(extra_dims),
),
)
if len(args) > len(extra_dims):
raise TypeError(
"%s has %d extra %s but %d %s given"
% (
cls.__name__,
len(extra_dims),
s("dimension", extra_dims),
len(args),
plural("was", "were", args),
),
)
missing = object()
coords = OrderedDict(zip(extra_dims, repeat(missing)))
to_add = dict(zip(extra_dims, args))
coords.update(to_add)
added = set(to_add)
for key, value in kwargs.items():
if key in added:
raise TypeError(
"%s got multiple values for dimension %r"
% (
cls.__name__,
coords,
),
)
coords[key] = value
added.add(key)
missing = {k for k, v in coords.items() if v is missing}
if missing:
missing = sorted(missing)
raise TypeError(
"no coordinate provided to %s for the following %s: %s"
% (
cls.__name__,
s("dimension", missing),
", ".join(missing),
),
)
# validate that all of the provided values exist along their given
# dimensions
for key, value in coords.items():
if value not in cls.extra_dims[key]:
raise ValueError(
"%r is not a value along the %s dimension of %s"
% (
value,
key,
cls.__name__,
),
)
return coords, tuple(coords.items())
@classmethod
def _make_dataset(cls, coords):
"""Construct a new dataset given the coordinates."""
class Slice(cls._SliceType):
extra_coords = coords
Slice.__name__ = "%s.slice(%s)" % (
cls.__name__,
", ".join("%s=%r" % item for item in coords.items()),
)
return Slice
[docs] @classmethod
def slice(cls, *args, **kwargs):
"""Take a slice of a DataSetFamily to produce a dataset
indexed by asset and date.
Parameters
----------
*args
**kwargs
The coordinates to fix along each extra dimension.
Returns
-------
dataset : DataSet
A regular pipeline dataset indexed by asset and date.
Notes
-----
The extra dimensions coords used to produce the result are available
under the ``extra_coords`` attribute.
"""
coords, hash_key = cls._canonical_key(args, kwargs)
try:
return cls._slice_cache[hash_key]
except KeyError:
pass
Slice = cls._make_dataset(coords)
cls._slice_cache[hash_key] = Slice
return Slice
CurrencyConversion = namedtuple(
"CurrencyConversion",
["currency", "field"],
)