# Copyright 2016 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# import array
# import binascii
# import struct
from abc import ABC
from collections import deque, namedtuple
from functools import partial
from numbers import Integral
from operator import attrgetter, itemgetter
import logging
import numpy as np
import pandas as pd
import sqlalchemy as sa
from toolz import (
compose,
concat,
concatv,
curry,
groupby,
merge,
partition_all,
sliding_window,
valmap,
)
from zipline.errors import (
EquitiesNotFound,
FutureContractsNotFound,
MultipleSymbolsFound,
MultipleSymbolsFoundForFuzzySymbol,
MultipleValuesFoundForField,
MultipleValuesFoundForSid,
NoValueForSid,
SameSymbolUsedAcrossCountries,
SidsNotFound,
SymbolNotFound,
ValueNotFoundForField,
)
from zipline.utils.functional import invert
from zipline.utils.memoize import lazyval
from zipline.utils.numpy_utils import as_column
from zipline.utils.preprocess import preprocess
from zipline.utils.sqlite_utils import coerce_string_to_eng, group_into_chunks
from . import Asset, Equity, Future
from .asset_db_schema import ASSET_DB_VERSION
from .asset_writer import (
SQLITE_MAX_VARIABLE_NUMBER,
asset_db_table_names,
check_version_info,
split_delimited_symbol,
symbol_columns as SYMBOL_COLUMNS,
)
from .continuous_futures import (
ADJUSTMENT_STYLES,
CHAIN_PREDICATES,
ContinuousFuture,
OrderedContracts,
)
from .exchange_info import ExchangeInfo
log = logging.getLogger("assets.py")
# A set of fields that need to be converted to strings before building an
# Asset to avoid unicode fields
# _asset_str_fields = frozenset(
# {
# "symbol",
# "asset_name",
# "exchange",
# }
# )
# A set of fields that need to be converted to timestamps in UTC
_asset_timestamp_fields = frozenset(
{
"start_date",
"end_date",
"first_traded",
"notice_date",
"expiration_date",
"auto_close_date",
}
)
OwnershipPeriod = namedtuple("OwnershipPeriod", "start end sid value")
def merge_ownership_periods(mappings):
"""Given a dict of mappings where the values are lists of
OwnershipPeriod objects, returns a dict with the same structure with
new OwnershipPeriod objects adjusted so that the periods have no
gaps.
Orders the periods chronologically, and pushes forward the end date
of each period to match the start date of the following period. The
end date of the last period pushed forward to the max Timestamp.
"""
return valmap(
lambda v: tuple(
OwnershipPeriod(
a.start,
b.start,
a.sid,
a.value,
)
for a, b in sliding_window(
2,
concatv(
sorted(v),
# concat with a fake ownership object to make the last
# end date be max timestamp
[
OwnershipPeriod(
pd.Timestamp.max,
None,
None,
None,
)
],
),
)
),
mappings,
)
def _build_ownership_map_from_rows(rows, key_from_row, value_from_row):
mappings = {}
for row in rows:
mappings.setdefault(key_from_row(row), [],).append(
OwnershipPeriod(
# TODO FIX TZ MESS
# pd.Timestamp(row.start_date, unit="ns", tz="utc"),
# pd.Timestamp(row.end_date, unit="ns", tz="utc"),
pd.Timestamp(row.start_date, unit="ns", tz=None),
pd.Timestamp(row.end_date, unit="ns", tz=None),
row.sid,
value_from_row(row),
),
)
return merge_ownership_periods(mappings)
def build_ownership_map(conn, table, key_from_row, value_from_row):
"""Builds a dict mapping to lists of OwnershipPeriods, from a db table."""
return _build_ownership_map_from_rows(
conn.execute(sa.select(table.c)).fetchall(),
key_from_row,
value_from_row,
)
def build_grouped_ownership_map(conn, table, key_from_row, value_from_row, group_key):
"""Builds a dict mapping group keys to maps of keys to lists of
OwnershipPeriods, from a db table.
"""
grouped_rows = groupby(
group_key,
conn.execute(sa.select(table.c)).fetchall(),
)
return {
key: _build_ownership_map_from_rows(
rows,
key_from_row,
value_from_row,
)
for key, rows in grouped_rows.items()
}
@curry
def _filter_kwargs(names, dict_):
"""Filter out kwargs from a dictionary.
Parameters
----------
names : set[str]
The names to select from ``dict_``.
dict_ : dict[str, any]
The dictionary to select from.
Returns
-------
kwargs : dict[str, any]
``dict_`` where the keys intersect with ``names`` and the values are
not None.
"""
return {k: v for k, v in dict_.items() if k in names and v is not None}
_filter_future_kwargs = _filter_kwargs(Future._kwargnames)
_filter_equity_kwargs = _filter_kwargs(Equity._kwargnames)
def _convert_asset_timestamp_fields(dict_):
"""Takes in a dict of Asset init args and converts dates to pd.Timestamps"""
for key in _asset_timestamp_fields & dict_.keys():
# TODO FIX TZ MESS
# value = pd.Timestamp(dict_[key], tz="UTC")
value = pd.Timestamp(dict_[key], tz=None)
dict_[key] = None if pd.isnull(value) else value
return dict_
SID_TYPE_IDS = {
# Asset would be 0,
ContinuousFuture: 1,
}
CONTINUOUS_FUTURE_ROLL_STYLE_IDS = {
"calendar": 0,
"volume": 1,
}
CONTINUOUS_FUTURE_ADJUSTMENT_STYLE_IDS = {
None: 0,
"div": 1,
"add": 2,
}
def _encode_continuous_future_sid(root_symbol, offset, roll_style, adjustment_style):
# Generate a unique int identifier
values = (
SID_TYPE_IDS[ContinuousFuture],
offset,
*[ord(x) for x in root_symbol.upper()],
CONTINUOUS_FUTURE_ROLL_STYLE_IDS[roll_style],
CONTINUOUS_FUTURE_ADJUSTMENT_STYLE_IDS[adjustment_style],
)
return int("".join([str(x) for x in values]))
Lifetimes = namedtuple("Lifetimes", "sid start end")
[docs]class AssetFinder:
"""An AssetFinder is an interface to a database of Asset metadata written by
an ``AssetDBWriter``.
This class provides methods for looking up assets by unique integer id or
by symbol. For historical reasons, we refer to these unique ids as 'sids'.
Parameters
----------
engine : str or SQLAlchemy.engine
An engine with a connection to the asset database to use, or a string
that can be parsed by SQLAlchemy as a URI.
future_chain_predicates : dict
A dict mapping future root symbol to a predicate function which accepts
a contract as a parameter and returns whether or not the contract should be
included in the chain.
See Also
--------
:class:`zipline.assets.AssetDBWriter`
"""
@preprocess(engine=coerce_string_to_eng(require_exists=True))
def __init__(self, engine, future_chain_predicates=CHAIN_PREDICATES):
self.engine = engine
metadata_obj = sa.MetaData()
metadata_obj.reflect(engine, only=asset_db_table_names)
for table_name in asset_db_table_names:
setattr(self, table_name, metadata_obj.tables[table_name])
# Check the version info of the db for compatibility
with engine.connect() as conn:
check_version_info(conn, self.version_info, ASSET_DB_VERSION)
# Cache for lookup of assets by sid, the objects in the asset lookup
# may be shared with the results from equity and future lookup caches.
#
# The top level cache exists to minimize lookups on the asset type
# routing.
#
# The caches are read through, i.e. accessing an asset through
# retrieve_asset will populate the cache on first retrieval.
self._asset_cache = {}
self._asset_type_cache = {}
self._caches = (self._asset_cache, self._asset_type_cache)
self._future_chain_predicates = (
future_chain_predicates if future_chain_predicates is not None else {}
)
self._ordered_contracts = {}
# Populated on first call to `lifetimes`.
self._asset_lifetimes = {}
@lazyval
def exchange_info(self):
with self.engine.connect() as conn:
es = conn.execute(sa.select(self.exchanges.c)).fetchall()
return {
name: ExchangeInfo(name, canonical_name, country_code)
for name, canonical_name, country_code in es
}
@lazyval
def symbol_ownership_map(self):
out = {}
for mappings in self.symbol_ownership_maps_by_country_code.values():
for key, ownership_periods in mappings.items():
out.setdefault(key, []).extend(ownership_periods)
return out
@lazyval
def symbol_ownership_maps_by_country_code(self):
with self.engine.connect() as conn:
query = sa.select(
self.equities.c.sid,
self.exchanges.c.country_code,
).where(self.equities.c.exchange == self.exchanges.c.exchange)
sid_to_country_code = dict(conn.execute(query).fetchall())
return build_grouped_ownership_map(
conn,
table=self.equity_symbol_mappings,
key_from_row=(lambda row: (row.company_symbol, row.share_class_symbol)),
value_from_row=lambda row: row.symbol,
group_key=lambda row: sid_to_country_code[row.sid],
)
@lazyval
def country_codes(self):
return tuple(self.symbol_ownership_maps_by_country_code)
@staticmethod
def _fuzzify_symbol_ownership_map(ownership_map):
fuzzy_mappings = {}
for (cs, scs), owners in ownership_map.items():
fuzzy_owners = fuzzy_mappings.setdefault(
cs + scs,
[],
)
fuzzy_owners.extend(owners)
fuzzy_owners.sort()
return fuzzy_mappings
@lazyval
def fuzzy_symbol_ownership_map(self):
return self._fuzzify_symbol_ownership_map(self.symbol_ownership_map)
@lazyval
def fuzzy_symbol_ownership_maps_by_country_code(self):
return valmap(
self._fuzzify_symbol_ownership_map,
self.symbol_ownership_maps_by_country_code,
)
@lazyval
def equity_supplementary_map(self):
with self.engine.connect() as conn:
return build_ownership_map(
conn,
table=self.equity_supplementary_mappings,
key_from_row=lambda row: (row.field, row.value),
value_from_row=lambda row: row.value,
)
@lazyval
def equity_supplementary_map_by_sid(self):
with self.engine.connect() as conn:
return build_ownership_map(
conn,
table=self.equity_supplementary_mappings,
key_from_row=lambda row: (row.field, row.sid),
value_from_row=lambda row: row.value,
)
[docs] def lookup_asset_types(self, sids):
"""Retrieve asset types for a list of sids.
Parameters
----------
sids : list[int]
Returns
-------
types : dict[sid -> str or None]
Asset types for the provided sids.
"""
found = {}
missing = set()
for sid in sids:
try:
found[sid] = self._asset_type_cache[sid]
except KeyError:
missing.add(sid)
if not missing:
return found
router_cols = self.asset_router.c
with self.engine.connect() as conn:
for assets in group_into_chunks(missing):
query = sa.select(router_cols.sid, router_cols.asset_type).where(
self.asset_router.c.sid.in_(map(int, assets))
)
for sid, type_ in conn.execute(query).fetchall():
missing.remove(sid)
found[sid] = self._asset_type_cache[sid] = type_
for sid in missing:
found[sid] = self._asset_type_cache[sid] = None
return found
[docs] def group_by_type(self, sids):
"""Group a list of sids by asset type.
Parameters
----------
sids : list[int]
Returns
-------
types : dict[str or None -> list[int]]
A dict mapping unique asset types to lists of sids drawn from sids.
If we fail to look up an asset, we assign it a key of None.
"""
return invert(self.lookup_asset_types(sids))
[docs] def retrieve_asset(self, sid, default_none=False):
"""
Retrieve the Asset for a given sid.
"""
try:
asset = self._asset_cache[sid]
if asset is None and not default_none:
raise SidsNotFound(sids=[sid])
return asset
except KeyError:
return self.retrieve_all((sid,), default_none=default_none)[0]
[docs] def retrieve_all(self, sids, default_none=False):
"""Retrieve all assets in `sids`.
Parameters
----------
sids : iterable of int
Assets to retrieve.
default_none : bool
If True, return None for failed lookups.
If False, raise `SidsNotFound`.
Returns
-------
assets : list[Asset or None]
A list of the same length as `sids` containing Assets (or Nones)
corresponding to the requested sids.
Raises
------
SidsNotFound
When a requested sid is not found and default_none=False.
"""
sids = list(sids)
hits, missing, failures = {}, set(), []
for sid in sids:
try:
asset = self._asset_cache[sid]
if not default_none and asset is None:
# Bail early if we've already cached that we don't know
# about an asset.
raise SidsNotFound(sids=[sid])
hits[sid] = asset
except KeyError:
missing.add(sid)
# All requests were cache hits. Return requested sids in order.
if not missing:
return [hits[sid] for sid in sids]
update_hits = hits.update
# Look up cache misses by type.
type_to_assets = self.group_by_type(missing)
# Handle failures
failures = {failure: None for failure in type_to_assets.pop(None, ())}
update_hits(failures)
self._asset_cache.update(failures)
if failures and not default_none:
raise SidsNotFound(sids=list(failures))
# We don't update the asset cache here because it should already be
# updated by `self.retrieve_equities`.
update_hits(self.retrieve_equities(type_to_assets.pop("equity", ())))
update_hits(self.retrieve_futures_contracts(type_to_assets.pop("future", ())))
# We shouldn't know about any other asset types.
if type_to_assets:
raise AssertionError("Found asset types: %s" % list(type_to_assets.keys()))
return [hits[sid] for sid in sids]
[docs] def retrieve_equities(self, sids):
"""Retrieve Equity objects for a list of sids.
Users generally shouldn't need to this method (instead, they should
prefer the more general/friendly `retrieve_assets`), but it has a
documented interface and tests because it's used upstream.
Parameters
----------
sids : iterable[int]
Returns
-------
equities : dict[int -> Equity]
Raises
------
EquitiesNotFound
When any requested asset isn't found.
"""
return self._retrieve_assets(sids, self.equities, Equity)
def _retrieve_equity(self, sid):
return self.retrieve_equities((sid,))[sid]
[docs] def retrieve_futures_contracts(self, sids):
"""Retrieve Future objects for an iterable of sids.
Users generally shouldn't need to this method (instead, they should
prefer the more general/friendly `retrieve_assets`), but it has a
documented interface and tests because it's used upstream.
Parameters
----------
sids : iterable[int]
Returns
-------
equities : dict[int -> Equity]
Raises
------
EquitiesNotFound
When any requested asset isn't found.
"""
return self._retrieve_assets(sids, self.futures_contracts, Future)
@staticmethod
def _select_assets_by_sid(asset_tbl, sids):
return sa.select(asset_tbl).where(asset_tbl.c.sid.in_(map(int, sids)))
@staticmethod
def _select_asset_by_symbol(asset_tbl, symbol):
return sa.select(asset_tbl).where(asset_tbl.c.symbol == symbol)
def _select_most_recent_symbols_chunk(self, sid_group):
"""Retrieve the most recent symbol for a set of sids.
Parameters
----------
sid_group : iterable[int]
The sids to lookup. The length of this sequence must be less than
or equal to SQLITE_MAX_VARIABLE_NUMBER because the sids will be
passed in as sql bind params.
Returns
-------
sel : Selectable
The sqlalchemy selectable that will query for the most recent
symbol for each sid.
Notes
-----
This is implemented as an inner select of the columns of interest
ordered by the end date of the (sid, symbol) mapping. We then group
that inner select on the sid with no aggregations to select the last
row per group which gives us the most recently active symbol for all
of the sids.
"""
cols = self.equity_symbol_mappings.c
# These are the columns we actually want.
data_cols = (cols.sid,) + tuple(cols[name] for name in SYMBOL_COLUMNS)
# Also select the max of end_date so that all non-grouped fields take
# on the value associated with the max end_date.
# to_select = data_cols + (sa.func.max(cols.end_date),)
func_rank = (
sa.func.rank()
.over(order_by=cols.end_date.desc(), partition_by=cols.sid)
.label("rnk")
)
to_select = data_cols + (func_rank,)
subquery = (
sa.select(*to_select)
.where(cols.sid.in_(map(int, sid_group)))
.subquery("sq")
)
query = (
sa.select(subquery.columns)
.filter(subquery.c.rnk == 1)
.select_from(subquery)
)
return query
def _lookup_most_recent_symbols(self, sids):
with self.engine.connect() as conn:
return {
row.sid: {c: row[c] for c in SYMBOL_COLUMNS}
for row in concat(
conn.execute(self._select_most_recent_symbols_chunk(sid_group))
.mappings()
.fetchall()
for sid_group in partition_all(SQLITE_MAX_VARIABLE_NUMBER, sids)
)
}
def _retrieve_asset_dicts(self, sids, asset_tbl, querying_equities):
if not sids:
return
if querying_equities:
def mkdict(
row,
exchanges=self.exchange_info,
symbols=self._lookup_most_recent_symbols(sids),
):
d = dict(row)
d["exchange_info"] = exchanges[d.pop("exchange")]
# we are not required to have a symbol for every asset, if
# we don't have any symbols we will just use the empty string
return merge(d, symbols.get(row["sid"], {}))
else:
def mkdict(row, exchanges=self.exchange_info):
d = dict(row)
d["exchange_info"] = exchanges[d.pop("exchange")]
return d
for assets in group_into_chunks(sids):
# Load misses from the db.
query = self._select_assets_by_sid(asset_tbl, assets)
with self.engine.connect() as conn:
for row in conn.execute(query).mappings().fetchall():
yield _convert_asset_timestamp_fields(mkdict(row))
def _retrieve_assets(self, sids, asset_tbl, asset_type):
"""Internal function for loading assets from a table.
This should be the only method of `AssetFinder` that writes Assets into
self._asset_cache.
Parameters
---------
sids : iterable of int
Asset ids to look up.
asset_tbl : sqlalchemy.Table
Table from which to query assets.
asset_type : type
Type of asset to be constructed.
Returns
-------
assets : dict[int -> Asset]
Dict mapping requested sids to the retrieved assets.
"""
# Fastpath for empty request.
if not sids:
return {}
cache = self._asset_cache
hits = {}
querying_equities = issubclass(asset_type, Equity)
filter_kwargs = (
_filter_equity_kwargs if querying_equities else _filter_future_kwargs
)
rows = self._retrieve_asset_dicts(sids, asset_tbl, querying_equities)
for row in rows:
sid = row["sid"]
asset = asset_type(**filter_kwargs(row))
hits[sid] = cache[sid] = asset
# If we get here, it means something in our code thought that a
# particular sid was an equity/future and called this function with a
# concrete type, but we couldn't actually resolve the asset. This is
# an error in our code, not a user-input error.
misses = tuple(set(sids) - hits.keys())
if misses:
if querying_equities:
raise EquitiesNotFound(sids=misses)
else:
raise FutureContractsNotFound(sids=misses)
return hits
def _lookup_symbol_strict(self, ownership_map, multi_country, symbol, as_of_date):
"""Resolve a symbol to an asset object without fuzzy matching.
Parameters
----------
ownership_map : dict[(str, str), list[OwnershipPeriod]]
The mapping from split symbols to ownership periods.
multi_country : bool
Does this mapping span multiple countries?
symbol : str
The symbol to look up.
as_of_date : datetime or None
If multiple assets have held this sid, which day should the
resolution be checked against? If this value is None and multiple
sids have held the ticker, then a MultipleSymbolsFound error will
be raised.
Returns
-------
asset : Asset
The asset that held the given symbol.
Raises
------
SymbolNotFound
Raised when the symbol or symbol as_of_date pair do not map to
any assets.
MultipleSymbolsFound
Raised when multiple assets held the symbol. This happens if
multiple assets held the symbol at disjoint times and
``as_of_date`` is None, or if multiple assets held the symbol at
the same time and``multi_country`` is True.
Notes
-----
The resolution algorithm is as follows:
- Split the symbol into the company and share class component.
- Do a dictionary lookup of the
``(company_symbol, share_class_symbol)`` in the provided ownership
map.
- If there is no entry in the dictionary, we don't know about this
symbol so raise a ``SymbolNotFound`` error.
- If ``as_of_date`` is None:
- If more there is more than one owner, raise
``MultipleSymbolsFound``
- Otherwise, because the list mapped to a symbol cannot be empty,
return the single asset.
- Iterate through all of the owners:
- If the ``as_of_date`` is between the start and end of the ownership
period:
- If multi_country is False, return the found asset.
- Otherwise, put the asset in a list.
- At the end of the loop, if there are no candidate assets, raise a
``SymbolNotFound``.
- If there is exactly one candidate, return it.
- Othewise, raise ``MultipleSymbolsFound`` because the ticker is not
unique across countries.
"""
# split the symbol into the components, if there are no
# company/share class parts then share_class_symbol will be empty
company_symbol, share_class_symbol = split_delimited_symbol(symbol)
try:
owners = ownership_map[company_symbol, share_class_symbol]
assert owners, "empty owners list for %r" % symbol
except KeyError as exc:
# no equity has ever held this symbol
raise SymbolNotFound(symbol=symbol) from exc
if not as_of_date:
# exactly one equity has ever held this symbol, we may resolve
# without the date
if len(owners) == 1:
return self.retrieve_asset(owners[0].sid)
options = {self.retrieve_asset(owner.sid) for owner in owners}
if multi_country:
country_codes = map(attrgetter("country_code"), options)
if len(set(country_codes)) > 1:
raise SameSymbolUsedAcrossCountries(
symbol=symbol, options=dict(zip(country_codes, options))
)
# more than one equity has held this ticker, this
# is ambiguous without the date
raise MultipleSymbolsFound(symbol=symbol, options=options)
options = []
country_codes = []
for start, end, sid, _ in owners:
if start <= as_of_date < end:
# find the equity that owned it on the given asof date
asset = self.retrieve_asset(sid)
# if this asset owned the symbol on this asof date and we are
# only searching one country, return that asset
if not multi_country:
return asset
else:
options.append(asset)
country_codes.append(asset.country_code)
if not options:
# no equity held the ticker on the given asof date
raise SymbolNotFound(symbol=symbol)
# if there is one valid option given the asof date, return that option
if len(options) == 1:
return options[0]
# if there's more than one option given the asof date, a country code
# must be passed to resolve the symbol to an asset
raise SameSymbolUsedAcrossCountries(
symbol=symbol, options=dict(zip(country_codes, options))
)
def _lookup_symbol_fuzzy(self, ownership_map, multi_country, symbol, as_of_date):
symbol = symbol.upper()
company_symbol, share_class_symbol = split_delimited_symbol(symbol)
try:
owners = ownership_map[company_symbol + share_class_symbol]
assert owners, "empty owners list for %r" % symbol
except KeyError as exc:
# no equity has ever held a symbol matching the fuzzy symbol
raise SymbolNotFound(symbol=symbol) from exc
if not as_of_date:
if len(owners) == 1:
# only one valid match
return self.retrieve_asset(owners[0].sid)
options = []
for _, _, sid, sym in owners:
if sym == symbol:
# there are multiple options, look for exact matches
options.append(self.retrieve_asset(sid))
if len(options) == 1:
# there was only one exact match
return options[0]
# there is more than one exact match for this fuzzy symbol
raise MultipleSymbolsFoundForFuzzySymbol(
symbol=symbol,
options=self.retrieve_all(owner.sid for owner in owners),
)
options = {}
for start, end, sid, sym in owners:
if start <= as_of_date < end:
# see which fuzzy symbols were owned on the asof date.
options[sid] = sym
if not options:
# no equity owned the fuzzy symbol on the date requested
raise SymbolNotFound(symbol=symbol)
sid_keys = list(options.keys())
# If there was only one owner, or there is a fuzzy and non-fuzzy which
# map to the same sid, return it.
if len(options) == 1:
return self.retrieve_asset(sid_keys[0])
exact_options = []
for sid, sym in options.items():
# Possible to have a scenario where multiple fuzzy matches have the
# same date. Want to find the one where symbol and share class
# match.
if (company_symbol, share_class_symbol) == split_delimited_symbol(sym):
asset = self.retrieve_asset(sid)
if not multi_country:
return asset
else:
exact_options.append(asset)
if len(exact_options) == 1:
return exact_options[0]
# multiple equities held tickers matching the fuzzy ticker but
# there are no exact matches
raise MultipleSymbolsFoundForFuzzySymbol(
symbol=symbol,
options=self.retrieve_all(owner.sid for owner in owners),
)
def _choose_fuzzy_symbol_ownership_map(self, country_code):
if country_code is None:
return self.fuzzy_symbol_ownership_map
return self.fuzzy_symbol_ownership_maps_by_country_code.get(
country_code,
)
def _choose_symbol_ownership_map(self, country_code):
if country_code is None:
return self.symbol_ownership_map
return self.symbol_ownership_maps_by_country_code.get(country_code)
[docs] def lookup_symbol(self, symbol, as_of_date, fuzzy=False, country_code=None):
"""Lookup an equity by symbol.
Parameters
----------
symbol : str
The ticker symbol to resolve.
as_of_date : datetime.datetime or None
Look up the last owner of this symbol as of this datetime.
If ``as_of_date`` is None, then this can only resolve the equity
if exactly one equity has ever owned the ticker.
fuzzy : bool, optional
Should fuzzy symbol matching be used? Fuzzy symbol matching
attempts to resolve differences in representations for
shareclasses. For example, some people may represent the ``A``
shareclass of ``BRK`` as ``BRK.A``, where others could write
``BRK_A``.
country_code : str or None, optional
The country to limit searches to. If not provided, the search will
span all countries which increases the likelihood of an ambiguous
lookup.
Returns
-------
equity : Equity
The equity that held ``symbol`` on the given ``as_of_date``, or the
only equity to hold ``symbol`` if ``as_of_date`` is None.
Raises
------
SymbolNotFound
Raised when no equity has ever held the given symbol.
MultipleSymbolsFound
Raised when no ``as_of_date`` is given and more than one equity
has held ``symbol``. This is also raised when ``fuzzy=True`` and
there are multiple candidates for the given ``symbol`` on the
``as_of_date``. Also raised when no ``country_code`` is given and
the symbol is ambiguous across multiple countries.
"""
if symbol is None:
raise TypeError(
"Cannot lookup asset for symbol of None for "
"as of date %s." % as_of_date
)
if fuzzy:
f = self._lookup_symbol_fuzzy
mapping = self._choose_fuzzy_symbol_ownership_map(country_code)
else:
f = self._lookup_symbol_strict
mapping = self._choose_symbol_ownership_map(country_code)
if mapping is None:
raise SymbolNotFound(symbol=symbol)
return f(
mapping,
country_code is None,
symbol,
as_of_date,
)
[docs] def lookup_symbols(self, symbols, as_of_date, fuzzy=False, country_code=None):
"""Lookup a list of equities by symbol.
Equivalent to::
[finder.lookup_symbol(s, as_of, fuzzy) for s in symbols]
but potentially faster because repeated lookups are memoized.
Parameters
----------
symbols : sequence[str]
Sequence of ticker symbols to resolve.
as_of_date : pd.Timestamp
Forwarded to ``lookup_symbol``.
fuzzy : bool, optional
Forwarded to ``lookup_symbol``.
country_code : str or None, optional
The country to limit searches to. If not provided, the search will
span all countries which increases the likelihood of an ambiguous
lookup.
Returns
-------
equities : list[Equity]
"""
if not symbols:
return []
multi_country = country_code is None
if fuzzy:
f = self._lookup_symbol_fuzzy
mapping = self._choose_fuzzy_symbol_ownership_map(country_code)
else:
f = self._lookup_symbol_strict
mapping = self._choose_symbol_ownership_map(country_code)
if mapping is None:
raise SymbolNotFound(symbol=symbols[0])
memo = {}
out = []
append_output = out.append
for sym in symbols:
if sym in memo:
append_output(memo[sym])
else:
equity = memo[sym] = f(
mapping,
multi_country,
sym,
as_of_date,
)
append_output(equity)
return out
[docs] def lookup_future_symbol(self, symbol):
"""Lookup a future contract by symbol.
Parameters
----------
symbol : str
The symbol of the desired contract.
Returns
-------
future : Future
The future contract referenced by ``symbol``.
Raises
------
SymbolNotFound
Raised when no contract named 'symbol' is found.
"""
with self.engine.connect() as conn:
data = (
conn.execute(
self._select_asset_by_symbol(self.futures_contracts, symbol)
)
.mappings()
.fetchone()
)
# If no data found, raise an exception
if not data:
raise SymbolNotFound(symbol=symbol)
return self.retrieve_asset(data["sid"])
def lookup_by_supplementary_field(self, field_name, value, as_of_date):
try:
owners = self.equity_supplementary_map[
field_name,
value,
]
assert owners, "empty owners list for field %r (sid: %r)" % (
field_name,
value,
)
except KeyError as exc:
# no equity has ever held this value
raise ValueNotFoundForField(field=field_name, value=value) from exc
if not as_of_date:
if len(owners) > 1:
# more than one equity has held this value, this is ambigious
# without the date
raise MultipleValuesFoundForField(
field=field_name,
value=value,
options=set(
map(
compose(self.retrieve_asset, attrgetter("sid")),
owners,
)
),
)
# exactly one equity has ever held this value, we may resolve
# without the date
return self.retrieve_asset(owners[0].sid)
for start, end, sid, _ in owners:
if start <= as_of_date < end:
# find the equity that owned it on the given asof date
return self.retrieve_asset(sid)
# no equity held the value on the given asof date
raise ValueNotFoundForField(field=field_name, value=value)
[docs] def get_supplementary_field(self, sid, field_name, as_of_date):
"""Get the value of a supplementary field for an asset.
Parameters
----------
sid : int
The sid of the asset to query.
field_name : str
Name of the supplementary field.
as_of_date : pd.Timestamp, None
The last known value on this date is returned. If None, a
value is returned only if we've only ever had one value for
this sid. If None and we've had multiple values,
MultipleValuesFoundForSid is raised.
Raises
------
NoValueForSid
If we have no values for this asset, or no values was known
on this as_of_date.
MultipleValuesFoundForSid
If we have had multiple values for this asset over time, and
None was passed for as_of_date.
"""
try:
periods = self.equity_supplementary_map_by_sid[
field_name,
sid,
]
assert periods, "empty periods list for field %r and sid %r" % (
field_name,
sid,
)
except KeyError:
raise NoValueForSid(field=field_name, sid=sid) from KeyError
if not as_of_date:
if len(periods) > 1:
# This equity has held more than one value, this is ambigious
# without the date
raise MultipleValuesFoundForSid(
field=field_name,
sid=sid,
options={p.value for p in periods},
)
# this equity has only ever held this value, we may resolve
# without the date
return periods[0].value
for start, end, _, value in periods:
if start <= as_of_date < end:
return value
# Could not find a value for this sid on the as_of_date.
raise NoValueForSid(field=field_name, sid=sid)
def _get_contract_sids(self, root_symbol):
fc_cols = self.futures_contracts.c
with self.engine.connect() as conn:
return (
conn.execute(
sa.select(
fc_cols.sid,
)
.where(
(fc_cols.root_symbol == root_symbol)
& (fc_cols.start_date != pd.NaT.value)
)
.order_by(fc_cols.sid)
)
.scalars()
.fetchall()
)
def _get_root_symbol_exchange(self, root_symbol):
fc_cols = self.futures_root_symbols.c
fields = (fc_cols.exchange,)
with self.engine.connect() as conn:
exchange = conn.execute(
sa.select(*fields).where(fc_cols.root_symbol == root_symbol)
).scalar()
if exchange is not None:
return exchange
else:
raise SymbolNotFound(symbol=root_symbol)
def get_ordered_contracts(self, root_symbol):
try:
return self._ordered_contracts[root_symbol]
except KeyError:
contract_sids = self._get_contract_sids(root_symbol)
contracts = deque(self.retrieve_all(contract_sids))
chain_predicate = self._future_chain_predicates.get(root_symbol, None)
oc = OrderedContracts(root_symbol, contracts, chain_predicate)
self._ordered_contracts[root_symbol] = oc
return oc
def create_continuous_future(self, root_symbol, offset, roll_style, adjustment):
if adjustment not in ADJUSTMENT_STYLES:
raise ValueError(
f"Invalid adjustment style {adjustment!r}. Allowed adjustment styles are "
f"{list(ADJUSTMENT_STYLES)}."
)
oc = self.get_ordered_contracts(root_symbol)
exchange = self._get_root_symbol_exchange(root_symbol)
sid = _encode_continuous_future_sid(root_symbol, offset, roll_style, None)
mul_sid = _encode_continuous_future_sid(root_symbol, offset, roll_style, "div")
add_sid = _encode_continuous_future_sid(root_symbol, offset, roll_style, "add")
cf_template = partial(
ContinuousFuture,
root_symbol=root_symbol,
offset=offset,
roll_style=roll_style,
start_date=oc.start_date,
end_date=oc.end_date,
exchange_info=self.exchange_info[exchange],
)
cf = cf_template(sid=sid)
mul_cf = cf_template(sid=mul_sid, adjustment="mul")
add_cf = cf_template(sid=add_sid, adjustment="add")
self._asset_cache[cf.sid] = cf
self._asset_cache[mul_cf.sid] = mul_cf
self._asset_cache[add_cf.sid] = add_cf
return {None: cf, "mul": mul_cf, "add": add_cf}[adjustment]
def _make_sids(tblattr):
def _(self):
with self.engine.connect() as conn:
return tuple(
conn.execute(sa.select(getattr(self, tblattr).c.sid))
.scalars()
.fetchall()
)
return _
sids = property(
_make_sids("asset_router"),
doc="All the sids in the asset finder.",
)
equities_sids = property(
_make_sids("equities"),
doc="All of the sids for equities in the asset finder.",
)
futures_sids = property(
_make_sids("futures_contracts"),
doc="All of the sids for futures consracts in the asset finder.",
)
del _make_sids
def _lookup_generic_scalar(self, obj, as_of_date, country_code, matches, missing):
"""
Convert asset_convertible to an asset.
On success, append to matches.
On failure, append to missing.
"""
result = self._lookup_generic_scalar_helper(
obj,
as_of_date,
country_code,
)
if result is not None:
matches.append(result)
else:
missing.append(obj)
def _lookup_generic_scalar_helper(self, obj, as_of_date, country_code):
if isinstance(obj, (Asset, ContinuousFuture)):
return obj
if isinstance(obj, Integral):
try:
return self.retrieve_asset(int(obj))
except SidsNotFound:
return None
if isinstance(obj, str):
# Try to look up as an equity first.
try:
return self.lookup_symbol(
symbol=obj, as_of_date=as_of_date, country_code=country_code
)
except SymbolNotFound:
# Fall back to lookup as a Future
try:
# TODO: Support country_code for future_symbols?
return self.lookup_future_symbol(obj)
except SymbolNotFound:
return None
raise NotAssetConvertible("Input was %s, not AssetConvertible." % obj)
[docs] def lookup_generic(self, obj, as_of_date, country_code):
"""Convert an object into an Asset or sequence of Assets.
This method exists primarily as a convenience for implementing
user-facing APIs that can handle multiple kinds of input. It should
not be used for internal code where we already know the expected types
of our inputs.
Parameters
----------
obj : int, str, Asset, ContinuousFuture, or iterable
The object to be converted into one or more Assets.
Integers are interpreted as sids. Strings are interpreted as
tickers. Assets and ContinuousFutures are returned unchanged.
as_of_date : pd.Timestamp or None
Timestamp to use to disambiguate ticker lookups. Has the same
semantics as in `lookup_symbol`.
country_code : str or None
ISO-3166 country code to use to disambiguate ticker lookups. Has
the same semantics as in `lookup_symbol`.
Returns
-------
matches, missing : tuple
``matches`` is the result of the conversion. ``missing`` is a list
containing any values that couldn't be resolved. If ``obj`` is not
an iterable, ``missing`` will be an empty list.
"""
matches = []
missing = []
# Interpret input as scalar.
if isinstance(obj, (AssetConvertible, ContinuousFuture)):
self._lookup_generic_scalar(
obj=obj,
as_of_date=as_of_date,
country_code=country_code,
matches=matches,
missing=missing,
)
try:
return matches[0], missing
except IndexError:
if hasattr(obj, "__int__"):
raise SidsNotFound(sids=[obj]) from IndexError
else:
raise SymbolNotFound(symbol=obj) from IndexError
# Interpret input as iterable.
try:
iterator = iter(obj)
except TypeError:
raise NotAssetConvertible(
"Input was not a AssetConvertible or iterable of AssetConvertible."
) from TypeError
for obj in iterator:
self._lookup_generic_scalar(
obj=obj,
as_of_date=as_of_date,
country_code=country_code,
matches=matches,
missing=missing,
)
return matches, missing
def _compute_asset_lifetimes(self, **kwargs):
"""Compute and cache a recarray of asset lifetimes"""
sids = starts = ends = []
equities_cols = self.equities.c
exchanges_cols = self.exchanges.c
if len(kwargs) == 1:
if "country_codes" in kwargs.keys():
condt = exchanges_cols.country_code.in_(kwargs["country_codes"])
if "exchange_names" in kwargs.keys():
condt = exchanges_cols.exchange.in_(kwargs["exchange_names"])
with self.engine.connect() as conn:
results = conn.execute(
sa.select(
equities_cols.sid,
equities_cols.start_date,
equities_cols.end_date,
).where(
(exchanges_cols.exchange == equities_cols.exchange) & (condt)
)
).fetchall()
if results:
sids, starts, ends = zip(*results)
sid = np.array(sids, dtype="i8")
start = np.array(starts, dtype="f8")
end = np.array(ends, dtype="f8")
start[np.isnan(start)] = 0 # convert missing starts to 0
end[np.isnan(end)] = np.iinfo(int).max # convert missing end to INTMAX
return Lifetimes(sid, start.astype("i8"), end.astype("i8"))
[docs] def lifetimes(self, dates, include_start_date, country_codes):
"""Compute a DataFrame representing asset lifetimes for the specified date
range.
Parameters
----------
dates : pd.DatetimeIndex
The dates for which to compute lifetimes.
include_start_date : bool
Whether or not to count the asset as alive on its start_date.
This is useful in a backtesting context where `lifetimes` is being
used to signify "do I have data for this asset as of the morning of
this date?" For many financial metrics, (e.g. daily close), data
isn't available for an asset until the end of the asset's first
day.
country_codes : iterable[str]
The country codes to get lifetimes for.
Returns
-------
lifetimes : pd.DataFrame
A frame of dtype bool with `dates` as index and an Int64Index of
assets as columns. The value at `lifetimes.loc[date, asset]` will
be True iff `asset` existed on `date`. If `include_start_date` is
False, then lifetimes.loc[date, asset] will be false when date ==
asset.start_date.
See Also
--------
numpy.putmask
zipline.pipeline.engine.SimplePipelineEngine._compute_root_mask
"""
if isinstance(country_codes, str):
raise TypeError(
"Got string {!r} instead of an iterable of strings in "
"AssetFinder.lifetimes.".format(country_codes),
)
# normalize to a cache-key so that we can memoize results.
country_codes = frozenset(country_codes)
lifetimes = self._asset_lifetimes.get(country_codes)
if lifetimes is None:
self._asset_lifetimes[
country_codes
] = lifetimes = self._compute_asset_lifetimes(country_codes=country_codes)
raw_dates = as_column(dates.asi8)
if include_start_date:
mask = lifetimes.start <= raw_dates
else:
mask = lifetimes.start < raw_dates
mask &= raw_dates <= lifetimes.end
return pd.DataFrame(mask, index=dates, columns=lifetimes.sid)
[docs] def equities_sids_for_country_code(self, country_code):
"""Return all of the sids for a given country.
Parameters
----------
country_code : str
An ISO 3166 alpha-2 country code.
Returns
-------
tuple[int]
The sids whose exchanges are in this country.
"""
sids = self._compute_asset_lifetimes(country_codes=[country_code]).sid
return tuple(sids.tolist())
[docs] def equities_sids_for_exchange_name(self, exchange_name):
"""Return all of the sids for a given exchange_name.
Parameters
----------
exchange_name : str
Returns
-------
tuple[int]
The sids whose exchanges are in this country.
"""
sids = self._compute_asset_lifetimes(exchange_names=[exchange_name]).sid
return tuple(sids.tolist())
[docs]class AssetConvertible(ABC):
"""
ABC for types that are convertible to integer-representations of
Assets.
Includes Asset, str, and Integral
"""
pass
AssetConvertible.register(Integral)
AssetConvertible.register(Asset)
AssetConvertible.register(str)
class NotAssetConvertible(ValueError):
pass
class PricingDataAssociable(ABC):
"""ABC for types that can be associated with pricing data.
Includes Asset, Future, ContinuousFuture
"""
pass
PricingDataAssociable.register(Asset)
PricingDataAssociable.register(Future)
PricingDataAssociable.register(ContinuousFuture)
def was_active(reference_date_value, asset):
"""Whether or not `asset` was active at the time corresponding to
`reference_date_value`.
Parameters
----------
reference_date_value : int
Date, represented as nanoseconds since EPOCH, for which we want to know
if `asset` was alive. This is generally the result of accessing the
`value` attribute of a pandas Timestamp.
asset : Asset
The asset object to check.
Returns
-------
was_active : bool
Whether or not the `asset` existed at the specified time.
"""
return asset.start_date.value <= reference_date_value <= asset.end_date.value
def only_active_assets(reference_date_value, assets):
"""Filter an iterable of Asset objects down to just assets that were alive at
the time corresponding to `reference_date_value`.
Parameters
----------
reference_date_value : int
Date, represented as nanoseconds since EPOCH, for which we want to know
if `asset` was alive. This is generally the result of accessing the
`value` attribute of a pandas Timestamp.
assets : iterable[Asset]
The assets to filter.
Returns
-------
active_assets : list
List of the active assets from `assets` on the requested date.
"""
return [a for a in assets if was_active(reference_date_value, a)]