Source code for zipline.data.adjustments

import logging
import sqlite3
from collections import namedtuple
from errno import ENOENT
from os import remove

import numpy as np
import pandas as pd
from numpy import integer as any_integer

from zipline.utils.functional import keysorted
from zipline.utils.input_validation import preprocess
from zipline.utils.numpy_utils import (
    datetime64ns_dtype,
    float64_dtype,
    int64_dtype,
    uint32_dtype,
    uint64_dtype,
)
from zipline.utils.pandas_utils import empty_dataframe
from zipline.utils.sqlite_utils import coerce_string_to_conn, group_into_chunks

from ._adjustments import load_adjustments_from_sqlite

log = logging.getLogger(__name__)

SQLITE_ADJUSTMENT_TABLENAMES = frozenset(["splits", "dividends", "mergers"])

UNPAID_QUERY_TEMPLATE = """
SELECT sid, amount, pay_date from dividend_payouts
WHERE ex_date=? AND sid IN ({0})
"""

Dividend = namedtuple("Dividend", ["asset", "amount", "pay_date"])

UNPAID_STOCK_DIVIDEND_QUERY_TEMPLATE = """
SELECT sid, payment_sid, ratio, pay_date from stock_dividend_payouts
WHERE ex_date=? AND sid IN ({0})
"""

StockDividend = namedtuple(
    "StockDividend",
    ["asset", "payment_asset", "ratio", "pay_date"],
)

SQLITE_ADJUSTMENT_COLUMN_DTYPES = {
    "effective_date": any_integer,
    "ratio": float64_dtype,
    "sid": any_integer,
}

SQLITE_DIVIDEND_PAYOUT_COLUMN_DTYPES = {
    "sid": any_integer,
    "ex_date": any_integer,
    "declared_date": any_integer,
    "record_date": any_integer,
    "pay_date": any_integer,
    "amount": float,
}

SQLITE_STOCK_DIVIDEND_PAYOUT_COLUMN_DTYPES = {
    "sid": any_integer,
    "ex_date": any_integer,
    "declared_date": any_integer,
    "record_date": any_integer,
    "pay_date": any_integer,
    "payment_sid": any_integer,
    "ratio": float,
}


def specialize_any_integer(d):
    out = {}
    for k, v in d.items():
        if v is any_integer:
            out[k] = int64_dtype
        else:
            out[k] = v
    return out


[docs]class SQLiteAdjustmentReader: """Loads adjustments based on corporate actions from a SQLite database. Expects data written in the format output by `SQLiteAdjustmentWriter`. Parameters ---------- conn : str or sqlite3.Connection Connection from which to load data. See Also -------- :class:`zipline.data.adjustments.SQLiteAdjustmentWriter` """ _datetime_int_cols = { "splits": ("effective_date",), "mergers": ("effective_date",), "dividends": ("effective_date",), "dividend_payouts": ( "declared_date", "ex_date", "pay_date", "record_date", ), "stock_dividend_payouts": ( "declared_date", "ex_date", "pay_date", "record_date", ), } _raw_table_dtypes = { # We use any_integer above to be lenient in accepting different dtypes # from users. For our outputs, however, we always want to return the # same types, and any_integer turns into int32 on some numpy windows # builds, so specify int64 explicitly here. "splits": specialize_any_integer(SQLITE_ADJUSTMENT_COLUMN_DTYPES), "mergers": specialize_any_integer(SQLITE_ADJUSTMENT_COLUMN_DTYPES), "dividends": specialize_any_integer(SQLITE_ADJUSTMENT_COLUMN_DTYPES), "dividend_payouts": specialize_any_integer( SQLITE_DIVIDEND_PAYOUT_COLUMN_DTYPES, ), "stock_dividend_payouts": specialize_any_integer( SQLITE_STOCK_DIVIDEND_PAYOUT_COLUMN_DTYPES, ), } @preprocess(conn=coerce_string_to_conn(require_exists=True)) def __init__(self, conn): self.conn = conn def __enter__(self): return self def __exit__(self, *exc_info): self.close() def close(self): return self.conn.close()
[docs] def load_adjustments( self, dates, assets, should_include_splits, should_include_mergers, should_include_dividends, adjustment_type, ): """Load collection of Adjustment objects from underlying adjustments db. Parameters ---------- dates : pd.DatetimeIndex Dates for which adjustments are needed. assets : pd.Int64Index Assets for which adjustments are needed. should_include_splits : bool Whether split adjustments should be included. should_include_mergers : bool Whether merger adjustments should be included. should_include_dividends : bool Whether dividend adjustments should be included. adjustment_type : str Whether price adjustments, volume adjustments, or both, should be included in the output. Returns ------- adjustments : dict[str -> dict[int -> Adjustment]] A dictionary containing price and/or volume adjustment mappings from index to adjustment objects to apply at that index. """ dates = dates.tz_localize("UTC") return load_adjustments_from_sqlite( self.conn, dates, assets, should_include_splits, should_include_mergers, should_include_dividends, adjustment_type, )
def load_pricing_adjustments(self, columns, dates, assets): if "volume" not in set(columns): adjustment_type = "price" elif len(set(columns)) == 1: adjustment_type = "volume" else: adjustment_type = "all" adjustments = self.load_adjustments( dates, assets, should_include_splits=True, should_include_mergers=True, should_include_dividends=True, adjustment_type=adjustment_type, ) price_adjustments = adjustments.get("price") volume_adjustments = adjustments.get("volume") return [ volume_adjustments if column == "volume" else price_adjustments for column in columns ] def get_adjustments_for_sid(self, table_name, sid): t = (sid,) c = self.conn.cursor() adjustments_for_sid = c.execute( "SELECT effective_date, ratio FROM %s WHERE sid = ?" % table_name, t ).fetchall() c.close() return [ [pd.Timestamp(adjustment[0], unit="s"), adjustment[1]] for adjustment in adjustments_for_sid ] def get_dividends_with_ex_date(self, assets, date, asset_finder): seconds = date.value / int(1e9) c = self.conn.cursor() divs = [] for chunk in group_into_chunks(assets): query = UNPAID_QUERY_TEMPLATE.format(",".join(["?" for _ in chunk])) t = (seconds,) + tuple(map(lambda x: int(x), chunk)) c.execute(query, t) rows = c.fetchall() for row in rows: div = Dividend( asset_finder.retrieve_asset(row[0]), row[1], pd.Timestamp(row[2], unit="s", tz="UTC"), ) divs.append(div) c.close() return divs def get_stock_dividends_with_ex_date(self, assets, date, asset_finder): seconds = date.value / int(1e9) c = self.conn.cursor() stock_divs = [] for chunk in group_into_chunks(assets): query = UNPAID_STOCK_DIVIDEND_QUERY_TEMPLATE.format( ",".join(["?" for _ in chunk]) ) t = (seconds,) + tuple(map(lambda x: int(x), chunk)) c.execute(query, t) rows = c.fetchall() for row in rows: stock_div = StockDividend( asset_finder.retrieve_asset(row[0]), # asset asset_finder.retrieve_asset(row[1]), # payment_asset row[2], pd.Timestamp(row[3], unit="s", tz="UTC"), ) stock_divs.append(stock_div) c.close() return stock_divs
[docs] def unpack_db_to_component_dfs(self, convert_dates=False): """Returns the set of known tables in the adjustments file in DataFrame form. Parameters ---------- convert_dates : bool, optional By default, dates are returned in seconds since EPOCH. If convert_dates is True, all ints in date columns will be converted to datetimes. Returns ------- dfs : dict{str->DataFrame} Dictionary which maps table name to the corresponding DataFrame version of the table, where all date columns have been coerced back from int to datetime. """ return { t_name: self.get_df_from_table(t_name, convert_dates) for t_name in self._datetime_int_cols }
def get_df_from_table(self, table_name, convert_dates=False): try: date_cols = self._datetime_int_cols[table_name] except KeyError as exc: raise ValueError( f"Requested table {table_name} not found.\n" f"Available tables: {self._datetime_int_cols.keys()}\n" ) from exc # Dates are stored in second resolution as ints in adj.db tables. kwargs = ( # {"parse_dates": {col: {"unit": "s", "utc": True} for col in date_cols}} {"parse_dates": {col: {"unit": "s"} for col in date_cols}} if convert_dates else {} ) result = pd.read_sql( f"select * from {table_name}", self.conn, index_col="index", **kwargs, ) dtypes = self._df_dtypes(table_name, convert_dates) if not len(result): return empty_dataframe(*keysorted(dtypes)) result.rename_axis(None, inplace=True) result = result[sorted(dtypes)] # ensure expected order of columns return result def _df_dtypes(self, table_name, convert_dates): """Get dtypes to use when unpacking sqlite tables as dataframes.""" out = self._raw_table_dtypes[table_name] if convert_dates: out = out.copy() for date_column in self._datetime_int_cols[table_name]: out[date_column] = datetime64ns_dtype return out
[docs]class SQLiteAdjustmentWriter: """Writer for data to be read by SQLiteAdjustmentReader Parameters ---------- conn_or_path : str or sqlite3.Connection A handle to the target sqlite database. equity_daily_bar_reader : SessionBarReader Daily bar reader to use for dividend writes. overwrite : bool, optional, default=False If True and conn_or_path is a string, remove any existing files at the given path before connecting. See Also -------- zipline.data.adjustments.SQLiteAdjustmentReader """ def __init__(self, conn_or_path, equity_daily_bar_reader, overwrite=False): if isinstance(conn_or_path, sqlite3.Connection): self.conn = conn_or_path elif isinstance(conn_or_path, str): if overwrite: try: remove(conn_or_path) except OSError as e: if e.errno != ENOENT: raise self.conn = sqlite3.connect(conn_or_path) self.uri = conn_or_path else: raise TypeError("Unknown connection type %s" % type(conn_or_path)) self._equity_daily_bar_reader = equity_daily_bar_reader def __enter__(self): return self def __exit__(self, *exc_info): self.close() def close(self): self.conn.close() def _write(self, tablename, expected_dtypes, frame): if frame is None or frame.empty: # keeping the dtypes correct for empty frames is not easy # frame = pd.DataFrame( # np.array([], dtype=list(expected_dtypes.items())), # ) frame = pd.DataFrame(expected_dtypes, index=[]) else: if frozenset(frame.columns) != frozenset(expected_dtypes): raise ValueError( "Unexpected frame columns:\n" "Expected Columns: %s\n" "Received Columns: %s" % ( set(expected_dtypes), frame.columns.tolist(), ) ) actual_dtypes = frame.dtypes for colname, expected in expected_dtypes.items(): actual = actual_dtypes[colname] if not np.issubdtype(actual, expected): raise TypeError( "Expected data of type {expected} for column" " '{colname}', but got '{actual}'.".format( expected=expected, colname=colname, actual=actual, ), ) frame.to_sql( tablename, self.conn, if_exists="append", chunksize=50000, ) def write_frame(self, tablename, frame): if tablename not in SQLITE_ADJUSTMENT_TABLENAMES: raise ValueError( f"Adjustment table {tablename} not in {SQLITE_ADJUSTMENT_TABLENAMES}" ) if not (frame is None or frame.empty): frame = frame.copy() frame["effective_date"] = ( frame["effective_date"] .values.astype( "datetime64[s]", ) .astype("int64") ) return self._write( tablename, SQLITE_ADJUSTMENT_COLUMN_DTYPES, frame, )
[docs] def write_dividend_payouts(self, frame): """Write dividend payout data to SQLite table `dividend_payouts`.""" return self._write( "dividend_payouts", SQLITE_DIVIDEND_PAYOUT_COLUMN_DTYPES, frame, )
def write_stock_dividend_payouts(self, frame): return self._write( "stock_dividend_payouts", SQLITE_STOCK_DIVIDEND_PAYOUT_COLUMN_DTYPES, frame, )
[docs] def calc_dividend_ratios(self, dividends): """Calculate the ratios to apply to equities when looking back at pricing history so that the price is smoothed over the ex_date, when the market adjusts to the change in equity value due to upcoming dividend. Returns ------- DataFrame A frame in the same format as splits and mergers, with keys - sid, the id of the equity - effective_date, the date in seconds on which to apply the ratio. - ratio, the ratio to apply to backwards looking pricing data. """ if dividends is None or dividends.empty: return pd.DataFrame( np.array( [], dtype=[ ("sid", uint64_dtype), ("effective_date", uint32_dtype), ("ratio", float64_dtype), ], ) ) pricing_reader = self._equity_daily_bar_reader input_sids = dividends.sid.values unique_sids, sids_ix = np.unique(input_sids, return_inverse=True) dates = pricing_reader.sessions.values (close,) = pricing_reader.load_raw_arrays( ["close"], pd.Timestamp(dates[0]), pd.Timestamp(dates[-1]), unique_sids, ) date_ix = np.searchsorted(dates, dividends.ex_date.values) mask = date_ix > 0 date_ix = date_ix[mask] sids_ix = sids_ix[mask] input_dates = dividends.ex_date.values[mask] # subtract one day to get the close on the day prior to the merger previous_close = close[date_ix - 1, sids_ix] input_sids = input_sids[mask] amount = dividends.amount.values[mask] ratio = 1.0 - amount / previous_close non_nan_ratio_mask = ~np.isnan(ratio) for ix in np.flatnonzero(~non_nan_ratio_mask): log.warning( "Couldn't compute ratio for dividend" " sid=%(sid)s, ex_date=%(ex_date)s, amount=%(amount).3f", { "sid": input_sids[ix], "ex_date": pd.Timestamp(input_dates[ix]).strftime("%Y-%m-%d"), "amount": amount[ix], }, ) positive_ratio_mask = ratio > 0 for ix in np.flatnonzero(~positive_ratio_mask & non_nan_ratio_mask): log.warning( "Dividend ratio <= 0 for dividend" " sid=%(sid)s, ex_date=%(ex_date)s, amount=%(amount).3f", { "sid": input_sids[ix], "ex_date": pd.Timestamp(input_dates[ix]).strftime("%Y-%m-%d"), "amount": amount[ix], }, ) valid_ratio_mask = non_nan_ratio_mask & positive_ratio_mask return pd.DataFrame( { "sid": input_sids[valid_ratio_mask], "effective_date": input_dates[valid_ratio_mask], "ratio": ratio[valid_ratio_mask], } )
def _write_dividends(self, dividends): if dividends is None: dividend_payouts = None else: dividend_payouts = dividends.copy() # TODO: Check if that's the right place for this fix for pandas > 1.2.5 dividend_payouts.fillna(np.datetime64("NaT"), inplace=True) dividend_payouts["ex_date"] = ( dividend_payouts["ex_date"] .values.astype("datetime64[s]") .astype(int64_dtype) ) dividend_payouts["record_date"] = ( dividend_payouts["record_date"] .values.astype("datetime64[s]") .astype(int64_dtype) ) dividend_payouts["declared_date"] = ( dividend_payouts["declared_date"] .values.astype("datetime64[s]") .astype(int64_dtype) ) dividend_payouts["pay_date"] = ( dividend_payouts["pay_date"] .values.astype("datetime64[s]") .astype(int64_dtype) ) self.write_dividend_payouts(dividend_payouts) def _write_stock_dividends(self, stock_dividends): if stock_dividends is None: stock_dividend_payouts = None else: stock_dividend_payouts = stock_dividends.copy() stock_dividend_payouts["ex_date"] = ( stock_dividend_payouts["ex_date"] .values.astype("datetime64[s]") .astype(int64_dtype) ) stock_dividend_payouts["record_date"] = ( stock_dividend_payouts["record_date"] .values.astype("datetime64[s]") .astype(int64_dtype) ) stock_dividend_payouts["declared_date"] = ( stock_dividend_payouts["declared_date"] .values.astype("datetime64[s]") .astype(int64_dtype) ) stock_dividend_payouts["pay_date"] = ( stock_dividend_payouts["pay_date"] .values.astype("datetime64[s]") .astype(int64_dtype) ) self.write_stock_dividend_payouts(stock_dividend_payouts)
[docs] def write_dividend_data(self, dividends, stock_dividends=None): """Write both dividend payouts and the derived price adjustment ratios.""" # First write the dividend payouts. self._write_dividends(dividends) self._write_stock_dividends(stock_dividends) # Second from the dividend payouts, calculate ratios. dividend_ratios = self.calc_dividend_ratios(dividends) self.write_frame("dividends", dividend_ratios)
[docs] def write(self, splits=None, mergers=None, dividends=None, stock_dividends=None): """Writes data to a SQLite file to be read by SQLiteAdjustmentReader. Parameters ---------- splits : pandas.DataFrame, optional Dataframe containing split data. The format of this dataframe is: effective_date : int The date, represented as seconds since Unix epoch, on which the adjustment should be applied. ratio : float A value to apply to all data earlier than the effective date. For open, high, low, and close those values are multiplied by the ratio. Volume is divided by this value. sid : int The asset id associated with this adjustment. mergers : pandas.DataFrame, optional DataFrame containing merger data. The format of this dataframe is: effective_date : int The date, represented as seconds since Unix epoch, on which the adjustment should be applied. ratio : float A value to apply to all data earlier than the effective date. For open, high, low, and close those values are multiplied by the ratio. Volume is unaffected. sid : int The asset id associated with this adjustment. dividends : pandas.DataFrame, optional DataFrame containing dividend data. The format of the dataframe is: sid : int The asset id associated with this adjustment. ex_date : datetime64 The date on which an equity must be held to be eligible to receive payment. declared_date : datetime64 The date on which the dividend is announced to the public. pay_date : datetime64 The date on which the dividend is distributed. record_date : datetime64 The date on which the stock ownership is checked to determine distribution of dividends. amount : float The cash amount paid for each share. Dividend ratios are calculated as: ``1.0 - (dividend_value / "close on day prior to ex_date")`` stock_dividends : pandas.DataFrame, optional DataFrame containing stock dividend data. The format of the dataframe is: sid : int The asset id associated with this adjustment. ex_date : datetime64 The date on which an equity must be held to be eligible to receive payment. declared_date : datetime64 The date on which the dividend is announced to the public. pay_date : datetime64 The date on which the dividend is distributed. record_date : datetime64 The date on which the stock ownership is checked to determine distribution of dividends. payment_sid : int The asset id of the shares that should be paid instead of cash. ratio : float The ratio of currently held shares in the held sid that should be paid with new shares of the payment_sid. See Also -------- zipline.data.adjustments.SQLiteAdjustmentReader """ self.write_frame("splits", splits) self.write_frame("mergers", mergers) self.write_dividend_data(dividends, stock_dividends) # Use IF NOT EXISTS here to allow multiple writes if desired. self.conn.execute("CREATE INDEX IF NOT EXISTS splits_sids " "ON splits(sid)") self.conn.execute( "CREATE INDEX IF NOT EXISTS splits_effective_date " "ON splits(effective_date)" ) self.conn.execute("CREATE INDEX IF NOT EXISTS mergers_sids " "ON mergers(sid)") self.conn.execute( "CREATE INDEX IF NOT EXISTS mergers_effective_date " "ON mergers(effective_date)" ) self.conn.execute( "CREATE INDEX IF NOT EXISTS dividends_sid " "ON dividends(sid)" ) self.conn.execute( "CREATE INDEX IF NOT EXISTS dividends_effective_date " "ON dividends(effective_date)" ) self.conn.execute( "CREATE INDEX IF NOT EXISTS dividend_payouts_sid " "ON dividend_payouts(sid)" ) self.conn.execute( "CREATE INDEX IF NOT EXISTS dividends_payouts_ex_date " "ON dividend_payouts(ex_date)" ) self.conn.execute( "CREATE INDEX IF NOT EXISTS stock_dividend_payouts_sid " "ON stock_dividend_payouts(sid)" ) self.conn.execute( "CREATE INDEX IF NOT EXISTS stock_dividends_payouts_ex_date " "ON stock_dividend_payouts(ex_date)" )