#
# Copyright 2015 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from collections import namedtuple
import numpy as np
import pandas as pd
import sqlalchemy as sa
from toolz import first
from zipline.assets.asset_db_schema import (
ASSET_DB_VERSION,
asset_db_table_names,
asset_router,
)
from zipline.assets.asset_db_schema import equities as equities_table
from zipline.assets.asset_db_schema import (
equity_supplementary_mappings as equity_supplementary_mappings_table,
)
from zipline.assets.asset_db_schema import equity_symbol_mappings
from zipline.assets.asset_db_schema import exchanges as exchanges_table
from zipline.assets.asset_db_schema import futures_contracts as futures_contracts_table
from zipline.assets.asset_db_schema import futures_root_symbols, metadata, version_info
from zipline.errors import AssetDBVersionError
from zipline.utils.compat import ExitStack
from zipline.utils.preprocess import preprocess
from zipline.utils.range import from_tuple, intersecting_ranges
from zipline.utils.sqlite_utils import coerce_string_to_eng
# Define a namedtuple for use with the load_data and _load_data methods
AssetData = namedtuple(
"AssetData",
(
"equities",
"equities_mappings",
"futures",
"exchanges",
"root_symbols",
"equity_supplementary_mappings",
),
)
SQLITE_MAX_VARIABLE_NUMBER = 999
symbol_columns = frozenset(
{
"symbol",
"company_symbol",
"share_class_symbol",
}
)
mapping_columns = symbol_columns | {"start_date", "end_date"}
_index_columns = {
"equities": "sid",
"equity_supplementary_mappings": "sid",
"futures": "sid",
"exchanges": "exchange",
"root_symbols": "root_symbol",
}
def _normalize_index_columns_in_place(
equities, equity_supplementary_mappings, futures, exchanges, root_symbols
):
"""
Update dataframes in place to set indentifier columns as indices.
For each input frame, if the frame has a column with the same name as its
associated index column, set that column as the index.
Otherwise, assume the index already contains identifiers.
If frames are passed as None, they're ignored.
"""
for frame, column_name in (
(equities, "sid"),
(equity_supplementary_mappings, "sid"),
(futures, "sid"),
(exchanges, "exchange"),
(root_symbols, "root_symbol"),
):
if frame is not None and column_name in frame:
frame.set_index(column_name, inplace=True)
def _default_none(df, column):
return None
def _no_default(df, column):
if not df.empty:
raise ValueError("no default value for column %r" % column)
# Default values for the equities DataFrame
_equities_defaults = {
"symbol": _default_none,
"asset_name": _default_none,
"start_date": lambda df, col: 0,
"end_date": lambda df, col: np.iinfo(np.int64).max,
"first_traded": _default_none,
"auto_close_date": _default_none,
# the full exchange name
"exchange": _no_default,
}
# the defaults for ``equities`` in ``write_direct``
_direct_equities_defaults = _equities_defaults.copy()
del _direct_equities_defaults["symbol"]
# Default values for the futures DataFrame
_futures_defaults = {
"symbol": _default_none,
"root_symbol": _default_none,
"asset_name": _default_none,
"start_date": lambda df, col: 0,
"end_date": lambda df, col: np.iinfo(np.int64).max,
"first_traded": _default_none,
"exchange": _default_none,
"notice_date": _default_none,
"expiration_date": _default_none,
"auto_close_date": _default_none,
"tick_size": _default_none,
"multiplier": lambda df, col: 1,
}
# Default values for the exchanges DataFrame
_exchanges_defaults = {
"canonical_name": lambda df, col: df.index,
"country_code": lambda df, col: "??",
}
# Default values for the root_symbols DataFrame
_root_symbols_defaults = {
"sector": _default_none,
"description": _default_none,
"exchange": _default_none,
}
# Default values for the equity_supplementary_mappings DataFrame
_equity_supplementary_mappings_defaults = {
"value": _default_none,
"field": _default_none,
"start_date": lambda df, col: 0,
"end_date": lambda df, col: np.iinfo(np.int64).max,
}
# Default values for the equity_symbol_mappings DataFrame
_equity_symbol_mappings_defaults = {
"sid": _no_default,
"company_symbol": _default_none,
"share_class_symbol": _default_none,
"symbol": _default_none,
"start_date": lambda df, col: 0,
"end_date": lambda df, col: np.iinfo(np.int64).max,
}
# Fuzzy symbol delimiters that may break up a company symbol and share class
_delimited_symbol_delimiters_regex = re.compile(r"[./\-_]")
_delimited_symbol_default_triggers = frozenset({np.nan, None, ""})
def split_delimited_symbol(symbol):
"""
Takes in a symbol that may be delimited and splits it in to a company
symbol and share class symbol. Also returns the fuzzy symbol, which is the
symbol without any fuzzy characters at all.
Parameters
----------
symbol : str
The possibly-delimited symbol to be split
Returns
-------
company_symbol : str
The company part of the symbol.
share_class_symbol : str
The share class part of a symbol.
"""
# return blank strings for any bad fuzzy symbols, like NaN or None
if symbol in _delimited_symbol_default_triggers:
return "", ""
symbol = symbol.upper()
split_list = re.split(
pattern=_delimited_symbol_delimiters_regex,
string=symbol,
maxsplit=1,
)
# Break the list up in to its two components, the company symbol and the
# share class symbol
company_symbol = split_list[0]
if len(split_list) > 1:
share_class_symbol = split_list[1]
else:
share_class_symbol = ""
return company_symbol, share_class_symbol
def _generate_output_dataframe(data_subset, defaults):
"""
Generates an output dataframe from the given subset of user-provided
data, the given column names, and the given default values.
Parameters
----------
data_subset : DataFrame
A DataFrame, usually from an AssetData object,
that contains the user's input metadata for the asset type being
processed
defaults : dict
A dict where the keys are the names of the columns of the desired
output DataFrame and the values are a function from dataframe and
column name to the default values to insert in the DataFrame if no user
data is provided
Returns
-------
DataFrame
A DataFrame containing all user-provided metadata, and default values
wherever user-provided metadata was missing
"""
# The columns provided.
cols = set(data_subset.columns)
desired_cols = set(defaults)
# Drop columns with unrecognised headers.
data_subset.drop(cols - desired_cols, axis=1, inplace=True)
# Get those columns which we need but
# for which no data has been supplied.
for col in desired_cols - cols:
# write the default value for any missing columns
data_subset[col] = defaults[col](data_subset, col)
return data_subset
def _check_asset_group(group):
# workaround until fixed: https://github.com/pandas-dev/pandas/issues/47985
if group.empty:
return group
row = group.sort_values("end_date").iloc[-1]
row.start_date = group.start_date.min()
row.end_date = group.end_date.max()
row.drop(list(symbol_columns), inplace=True)
return row
def _format_range(r):
return (
str(pd.Timestamp(r.start, unit="ns")),
str(pd.Timestamp(r.stop, unit="ns")),
)
def _check_symbol_mappings(df, exchanges, asset_exchange):
"""Check that there are no cases where multiple symbols resolve to the same
asset at the same time in the same country.
Parameters
----------
df : pd.DataFrame
The equity symbol mappings table.
exchanges : pd.DataFrame
The exchanges table.
asset_exchange : pd.Series
A series that maps sids to the exchange the asset is in.
Raises
------
ValueError
Raised when there are ambiguous symbol mappings.
"""
mappings = df.set_index("sid")[list(mapping_columns)].copy()
try:
mappings["country_code"] = exchanges["country_code"][
asset_exchange.loc[df["sid"]]
].values
except KeyError:
mappings["country_code"] = exchanges.set_index("exchange")["country_code"].loc[
asset_exchange.loc[df["sid"]].values
]
ambiguous = {}
def check_intersections(persymbol):
intersections = list(
intersecting_ranges(
map(
from_tuple,
zip(persymbol.start_date, persymbol.end_date),
)
)
)
if intersections:
data = persymbol[["start_date", "end_date"]].astype("datetime64[ns]")
# indent the dataframe string, also compute this early because
# ``persymbol`` is a view and ``astype`` doesn't copy the index
# correctly in pandas 0.22
msg_component = "\n ".join(str(data).splitlines())
ambiguous[persymbol.name] = intersections, msg_component
mappings.groupby(["symbol", "country_code"], group_keys=False).apply(
check_intersections
)
if ambiguous:
raise ValueError(
"Ambiguous ownership for %d symbol%s, multiple assets held the"
" following symbols:\n%s"
% (
len(ambiguous),
"" if len(ambiguous) == 1 else "s",
"\n".join(
"%s (%s):\n intersections: %s\n %s"
% (
symbol,
country_code,
tuple(map(_format_range, intersections)),
cs,
)
for (symbol, country_code), (intersections, cs) in sorted(
ambiguous.items(),
key=first,
)
),
)
)
def _split_symbol_mappings(df, exchanges):
"""Split out the symbol: sid mappings from the raw data.
Parameters
----------
df : pd.DataFrame
The dataframe with multiple rows for each symbol: sid pair.
exchanges : pd.DataFrame
The exchanges table.
Returns
-------
asset_info : pd.DataFrame
The asset info with one row per asset.
symbol_mappings : pd.DataFrame
The dataframe of just symbol: sid mappings. The index will be
the sid, then there will be three columns: symbol, start_date, and
end_date.
"""
mappings = df[list(mapping_columns)]
with pd.option_context("mode.chained_assignment", None):
mappings["sid"] = mappings.index
mappings.reset_index(drop=True, inplace=True)
# take the most recent sid->exchange mapping based on end date
asset_exchange = (
df[["exchange", "end_date"]]
.sort_values("end_date")
.groupby(level=0)["exchange"]
.nth(-1)
)
_check_symbol_mappings(mappings, exchanges, asset_exchange)
return (
df.groupby(level=0, group_keys=False).apply(_check_asset_group),
mappings,
)
def _dt_to_epoch_ns(dt_series: pd.Series) -> pd.Index:
"""Convert a timeseries into an Int64Index of nanoseconds since the epoch.
Parameters
----------
dt_series : pd.Series
The timeseries to convert.
Returns
-------
idx : pd.Index
The index converted to nanoseconds since the epoch.
"""
index = pd.to_datetime(dt_series.values)
if index.tzinfo is None:
index = index.tz_localize("UTC")
else:
index = index.tz_convert("UTC")
return index.view(np.int64)
def check_version_info(conn, version_table, expected_version: int):
"""
Checks for a version value in the version table.
Parameters
----------
conn : Connection
The connection to use to perform the check.
version_table : sa.Table
The version table of the asset database
expected_version : int
The expected version of the asset database
Raises
------
AssetDBVersionError
If the version is in the table and not equal to ASSET_DB_VERSION.
"""
# Read the version out of the table
version_from_table = conn.execute(sa.select(version_table.c.version)).scalar()
# A db without a version is considered v0
if version_from_table is None:
version_from_table = 0
# Raise an error if the versions do not match
if version_from_table != expected_version:
raise AssetDBVersionError(
db_version=version_from_table, expected_version=expected_version
)
def write_version_info(conn, version_table, version_value):
"""
Inserts the version value in to the version table.
Parameters
----------
conn : sa.Connection
The connection to use to execute the insert.
version_table : sa.Table
The version table of the asset database
version_value : int
The version to write in to the database
"""
if conn.engine.name == "postgresql":
conn.execute(sa.text("ALTER SEQUENCE version_info_id_seq RESTART WITH 1"))
conn.execute(version_table.insert().values(version=version_value))
[docs]class AssetDBWriter:
"""Class used to write data to an assets db.
Parameters
----------
engine : Engine or str
An SQLAlchemy engine or path to a SQL database.
"""
DEFAULT_CHUNK_SIZE = SQLITE_MAX_VARIABLE_NUMBER
@preprocess(engine=coerce_string_to_eng(require_exists=False))
def __init__(self, engine):
self.engine = engine
def _real_write(
self,
equities,
equity_symbol_mappings,
equity_supplementary_mappings,
futures,
exchanges,
root_symbols,
chunk_size,
):
with self.engine.begin() as conn:
# Create SQL tables if they do not exist.
self.init_db(conn)
if exchanges is not None:
self._write_df_to_table(
exchanges_table,
exchanges,
conn,
chunk_size,
)
if root_symbols is not None:
self._write_df_to_table(
futures_root_symbols,
root_symbols,
conn,
chunk_size,
)
if equity_supplementary_mappings is not None:
self._write_df_to_table(
equity_supplementary_mappings_table,
equity_supplementary_mappings,
conn,
chunk_size,
)
if futures is not None:
self._write_assets(
"future",
futures,
conn,
chunk_size,
)
if equities is not None:
self._write_assets(
"equity",
equities,
conn,
chunk_size,
mapping_data=equity_symbol_mappings,
)
[docs] def write_direct(
self,
equities=None,
equity_symbol_mappings=None,
equity_supplementary_mappings=None,
futures=None,
exchanges=None,
root_symbols=None,
chunk_size=DEFAULT_CHUNK_SIZE,
):
"""Write asset metadata to a sqlite database in the format that it is
stored in the assets db.
Parameters
----------
equities : pd.DataFrame, optional
The equity metadata. The columns for this dataframe are:
symbol : str
The ticker symbol for this equity.
asset_name : str
The full name for this asset.
start_date : datetime
The date when this asset was created.
end_date : datetime, optional
The last date we have trade data for this asset.
first_traded : datetime, optional
The first date we have trade data for this asset.
auto_close_date : datetime, optional
The date on which to close any positions in this asset.
exchange : str
The exchange where this asset is traded.
The index of this dataframe should contain the sids.
futures : pd.DataFrame, optional
The future contract metadata. The columns for this dataframe are:
symbol : str
The ticker symbol for this futures contract.
root_symbol : str
The root symbol, or the symbol with the expiration stripped
out.
asset_name : str
The full name for this asset.
start_date : datetime, optional
The date when this asset was created.
end_date : datetime, optional
The last date we have trade data for this asset.
first_traded : datetime, optional
The first date we have trade data for this asset.
exchange : str
The exchange where this asset is traded.
notice_date : datetime
The date when the owner of the contract may be forced
to take physical delivery of the contract's asset.
expiration_date : datetime
The date when the contract expires.
auto_close_date : datetime
The date when the broker will automatically close any
positions in this contract.
tick_size : float
The minimum price movement of the contract.
multiplier: float
The amount of the underlying asset represented by this
contract.
exchanges : pd.DataFrame, optional
The exchanges where assets can be traded. The columns of this
dataframe are:
exchange : str
The full name of the exchange.
canonical_name : str
The canonical name of the exchange.
country_code : str
The ISO 3166 alpha-2 country code of the exchange.
root_symbols : pd.DataFrame, optional
The root symbols for the futures contracts. The columns for this
dataframe are:
root_symbol : str
The root symbol name.
root_symbol_id : int
The unique id for this root symbol.
sector : string, optional
The sector of this root symbol.
description : string, optional
A short description of this root symbol.
exchange : str
The exchange where this root symbol is traded.
equity_supplementary_mappings : pd.DataFrame, optional
Additional mappings from values of abitrary type to assets.
chunk_size : int, optional
The amount of rows to write to the SQLite table at once.
This defaults to the default number of bind params in sqlite.
If you have compiled sqlite3 with more bind or less params you may
want to pass that value here.
"""
if equities is not None:
equities = _generate_output_dataframe(
equities,
_direct_equities_defaults,
)
if equity_symbol_mappings is None:
raise ValueError(
"equities provided with no symbol mapping data",
)
equity_symbol_mappings = _generate_output_dataframe(
equity_symbol_mappings,
_equity_symbol_mappings_defaults,
)
_check_symbol_mappings(
equity_symbol_mappings,
exchanges,
equities["exchange"],
)
if equity_supplementary_mappings is not None:
equity_supplementary_mappings = _generate_output_dataframe(
equity_supplementary_mappings,
_equity_supplementary_mappings_defaults,
)
if futures is not None:
futures = _generate_output_dataframe(_futures_defaults, futures)
if exchanges is not None:
exchanges = _generate_output_dataframe(
exchanges.set_index("exchange"),
_exchanges_defaults,
)
if root_symbols is not None:
root_symbols = _generate_output_dataframe(
root_symbols,
_root_symbols_defaults,
)
# Set named identifier columns as indices, if provided.
_normalize_index_columns_in_place(
equities=equities,
equity_supplementary_mappings=equity_supplementary_mappings,
futures=futures,
exchanges=exchanges,
root_symbols=root_symbols,
)
self._real_write(
equities=equities,
equity_symbol_mappings=equity_symbol_mappings,
equity_supplementary_mappings=equity_supplementary_mappings,
futures=futures,
exchanges=exchanges,
root_symbols=root_symbols,
chunk_size=chunk_size,
)
[docs] def write(
self,
equities=None,
futures=None,
exchanges=None,
root_symbols=None,
equity_supplementary_mappings=None,
chunk_size=DEFAULT_CHUNK_SIZE,
):
"""Write asset metadata to a sqlite database.
Parameters
----------
equities : pd.DataFrame, optional
The equity metadata. The columns for this dataframe are:
symbol : str
The ticker symbol for this equity.
asset_name : str
The full name for this asset.
start_date : datetime
The date when this asset was created.
end_date : datetime, optional
The last date we have trade data for this asset.
first_traded : datetime, optional
The first date we have trade data for this asset.
auto_close_date : datetime, optional
The date on which to close any positions in this asset.
exchange : str
The exchange where this asset is traded.
The index of this dataframe should contain the sids.
futures : pd.DataFrame, optional
The future contract metadata. The columns for this dataframe are:
symbol : str
The ticker symbol for this futures contract.
root_symbol : str
The root symbol, or the symbol with the expiration stripped
out.
asset_name : str
The full name for this asset.
start_date : datetime, optional
The date when this asset was created.
end_date : datetime, optional
The last date we have trade data for this asset.
first_traded : datetime, optional
The first date we have trade data for this asset.
exchange : str
The exchange where this asset is traded.
notice_date : datetime
The date when the owner of the contract may be forced
to take physical delivery of the contract's asset.
expiration_date : datetime
The date when the contract expires.
auto_close_date : datetime
The date when the broker will automatically close any
positions in this contract.
tick_size : float
The minimum price movement of the contract.
multiplier: float
The amount of the underlying asset represented by this
contract.
exchanges : pd.DataFrame, optional
The exchanges where assets can be traded. The columns of this
dataframe are:
exchange : str
The full name of the exchange.
canonical_name : str
The canonical name of the exchange.
country_code : str
The ISO 3166 alpha-2 country code of the exchange.
root_symbols : pd.DataFrame, optional
The root symbols for the futures contracts. The columns for this
dataframe are:
root_symbol : str
The root symbol name.
root_symbol_id : int
The unique id for this root symbol.
sector : string, optional
The sector of this root symbol.
description : string, optional
A short description of this root symbol.
exchange : str
The exchange where this root symbol is traded.
equity_supplementary_mappings : pd.DataFrame, optional
Additional mappings from values of abitrary type to assets.
chunk_size : int, optional
The amount of rows to write to the SQLite table at once.
This defaults to the default number of bind params in sqlite.
If you have compiled sqlite3 with more bind or less params you may
want to pass that value here.
See Also
--------
zipline.assets.asset_finder
"""
if exchanges is None:
exchange_names = [
df["exchange"]
for df in (equities, futures, root_symbols)
if df is not None
]
if exchange_names:
exchanges = pd.DataFrame(
{
"exchange": pd.concat(exchange_names).unique(),
}
)
data = self._load_data(
equities if equities is not None else pd.DataFrame(),
futures if futures is not None else pd.DataFrame(),
exchanges if exchanges is not None else pd.DataFrame(),
root_symbols if root_symbols is not None else pd.DataFrame(),
(
equity_supplementary_mappings
if equity_supplementary_mappings is not None
else pd.DataFrame()
),
)
self._real_write(
equities=data.equities,
equity_symbol_mappings=data.equities_mappings,
equity_supplementary_mappings=data.equity_supplementary_mappings,
futures=data.futures,
root_symbols=data.root_symbols,
exchanges=data.exchanges,
chunk_size=chunk_size,
)
def _write_df_to_table(self, tbl, df, txn, chunk_size):
df = df.copy()
for column, dtype in df.dtypes.items():
if dtype.kind == "M":
df[column] = _dt_to_epoch_ns(df[column])
if txn.dialect.name == "postgresql":
txn.execute(sa.text(f"ALTER TABLE {tbl.name} DISABLE TRIGGER ALL;"))
df.to_sql(
tbl.name,
txn,
index=True,
index_label=first(tbl.primary_key.columns).name,
if_exists="append",
chunksize=chunk_size,
)
def _write_assets(self, asset_type, assets, txn, chunk_size, mapping_data=None):
if asset_type == "future":
tbl = futures_contracts_table
if mapping_data is not None:
raise TypeError("no mapping data expected for futures")
elif asset_type == "equity":
tbl = equities_table
if mapping_data is None:
raise TypeError("mapping data required for equities")
# write the symbol mapping data.
self._write_df_to_table(
equity_symbol_mappings,
mapping_data,
txn,
chunk_size,
)
else:
raise ValueError(
"asset_type must be in {'future', 'equity'}, got: %s" % asset_type,
)
self._write_df_to_table(tbl, assets, txn, chunk_size)
pd.DataFrame(
{
asset_router.c.sid.name: assets.index.values,
asset_router.c.asset_type.name: asset_type,
}
).to_sql(
asset_router.name,
txn,
if_exists="append",
index=False,
chunksize=chunk_size,
)
def _all_tables_present(self, txn):
"""
Checks if any tables are present in the current assets database.
Parameters
----------
txn : Transaction
The open transaction to check in.
Returns
-------
has_tables : bool
True if any tables are present, otherwise False.
"""
# conn = txn.connect()
for table_name in asset_db_table_names:
return sa.inspect(txn).has_table(table_name)
# if txn.dialect.has_table(conn, table_name):
# return True
# return False
[docs] def init_db(self, txn=None):
"""Connect to database and create tables.
Parameters
----------
txn : sa.engine.Connection, optional
The transaction block to execute in. If this is not provided, a new
transaction will be started with the engine provided.
Returns
-------
metadata : sa.MetaData
The metadata that describes the new assets db.
"""
with ExitStack() as stack:
if txn is None:
txn = stack.enter_context(self.engine.begin())
tables_already_exist = self._all_tables_present(txn)
# Create the SQL tables if they do not already exist.
metadata.create_all(txn, checkfirst=True)
if tables_already_exist:
check_version_info(txn, version_info, ASSET_DB_VERSION)
else:
write_version_info(txn, version_info, ASSET_DB_VERSION)
def _normalize_equities(self, equities, exchanges):
# HACK: If 'company_name' is provided, map it to asset_name
if "company_name" in equities.columns and "asset_name" not in equities.columns:
equities["asset_name"] = equities["company_name"]
# remap 'file_name' to 'symbol' if provided
if "file_name" in equities.columns:
equities["symbol"] = equities["file_name"]
equities_output = _generate_output_dataframe(
data_subset=equities,
defaults=_equities_defaults,
)
# Split symbols to company_symbols and share_class_symbols
tuple_series = equities_output["symbol"].apply(split_delimited_symbol)
split_symbols = pd.DataFrame(
tuple_series.tolist(),
columns=["company_symbol", "share_class_symbol"],
index=tuple_series.index,
)
equities_output = pd.concat((equities_output, split_symbols), axis=1)
# Upper-case all symbol data
for col in symbol_columns:
equities_output[col] = equities_output[col].str.upper()
# Convert date columns to UNIX Epoch integers (nanoseconds)
for col in ("start_date", "end_date", "first_traded", "auto_close_date"):
equities_output[col] = _dt_to_epoch_ns(equities_output[col])
return _split_symbol_mappings(equities_output, exchanges)
def _normalize_futures(self, futures):
futures_output = _generate_output_dataframe(
data_subset=futures,
defaults=_futures_defaults,
)
for col in ("symbol", "root_symbol"):
futures_output[col] = futures_output[col].str.upper()
for col in (
"start_date",
"end_date",
"first_traded",
"notice_date",
"expiration_date",
"auto_close_date",
):
futures_output[col] = _dt_to_epoch_ns(futures_output[col])
return futures_output
def _normalize_equity_supplementary_mappings(self, mappings):
mappings_output = _generate_output_dataframe(
data_subset=mappings,
defaults=_equity_supplementary_mappings_defaults,
)
for col in ("start_date", "end_date"):
mappings_output[col] = _dt_to_epoch_ns(mappings_output[col])
return mappings_output
def _load_data(
self, equities, futures, exchanges, root_symbols, equity_supplementary_mappings
):
"""
Returns a standard set of pandas.DataFrames:
equities, futures, exchanges, root_symbols
"""
# Set named identifier columns as indices, if provided.
_normalize_index_columns_in_place(
equities=equities,
equity_supplementary_mappings=equity_supplementary_mappings,
futures=futures,
exchanges=exchanges,
root_symbols=root_symbols,
)
futures_output = self._normalize_futures(futures)
equity_supplementary_mappings_output = (
self._normalize_equity_supplementary_mappings(
equity_supplementary_mappings,
)
)
exchanges_output = _generate_output_dataframe(
data_subset=exchanges,
defaults=_exchanges_defaults,
)
equities_output, equities_mappings = self._normalize_equities(
equities,
exchanges_output,
)
root_symbols_output = _generate_output_dataframe(
data_subset=root_symbols,
defaults=_root_symbols_defaults,
)
return AssetData(
equities=equities_output,
equities_mappings=equities_mappings,
futures=futures_output,
exchanges=exchanges_output,
root_symbols=root_symbols_output,
equity_supplementary_mappings=equity_supplementary_mappings_output,
)