Source code for zipline.data.bcolz_daily_bars

# Copyright 2015 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import warnings
from functools import partial

with warnings.catch_warnings():  # noqa
    warnings.filterwarnings("ignore", category=DeprecationWarning)
    from bcolz import carray, ctable
    import numpy as np

import logging

import pandas as pd

from zipline.data.bar_reader import NoDataAfterDate, NoDataBeforeDate, NoDataOnDate
from zipline.data.session_bars import CurrencyAwareSessionBarReader
from zipline.utils.calendar_utils import get_calendar
from zipline.utils.cli import maybe_show_progress
from zipline.utils.functional import apply
from zipline.utils.input_validation import expect_element
from zipline.utils.memoize import lazyval
from zipline.utils.numpy_utils import float64_dtype, iNaT, uint32_dtype

from ._equities import _compute_row_slices, _read_bcolz_data

logger = logging.getLogger("UsEquityPricing")

OHLC = frozenset(["open", "high", "low", "close"])
US_EQUITY_PRICING_BCOLZ_COLUMNS = (
    "open",
    "high",
    "low",
    "close",
    "volume",
    "day",
    "id",
)

UINT32_MAX = np.iinfo(np.uint32).max


def check_uint32_safe(value, colname):
    if value >= UINT32_MAX:
        raise ValueError("Value %s from column '%s' is too large" % (value, colname))


@expect_element(invalid_data_behavior={"warn", "raise", "ignore"})
def winsorise_uint32(df, invalid_data_behavior, column, *columns):
    """Drops any record where a value would not fit into a uint32.

    Parameters
    ----------
    df : pd.DataFrame
        The dataframe to winsorise.
    invalid_data_behavior : {'warn', 'raise', 'ignore'}
        What to do when data is outside the bounds of a uint32.
    *columns : iterable[str]
        The names of the columns to check.

    Returns
    -------
    truncated : pd.DataFrame
        ``df`` with values that do not fit into a uint32 zeroed out.
    """
    columns = list((column,) + columns)
    mask = df[columns] > UINT32_MAX

    if invalid_data_behavior != "ignore":
        mask |= df[columns].isnull()
    else:
        # we are not going to generate a warning or error for this so just use
        # nan_to_num
        df[columns] = np.nan_to_num(df[columns])

    mv = mask.values
    if mv.any():
        if invalid_data_behavior == "raise":
            raise ValueError(
                "%d values out of bounds for uint32: %r"
                % (
                    mv.sum(),
                    df[mask.any(axis=1)],
                ),
            )
        if invalid_data_behavior == "warn":
            warnings.warn(
                "Ignoring %d values because they are out of bounds for"
                " uint32:\n %r"
                % (
                    mv.sum(),
                    df[mask.any(axis=1)],
                ),
                stacklevel=3,  # one extra frame for `expect_element`
            )

    df[mask] = 0
    return df


[docs]class BcolzDailyBarWriter: """Class capable of writing daily OHLCV data to disk in a format that can be read efficiently by BcolzDailyOHLCVReader. Parameters ---------- filename : str The location at which we should write our output. calendar : zipline.utils.calendar.trading_calendar Calendar to use to compute asset calendar offsets. start_session: pd.Timestamp Midnight UTC session label. end_session: pd.Timestamp Midnight UTC session label. See Also -------- zipline.data.bcolz_daily_bars.BcolzDailyBarReader """ _csv_dtypes = { "open": float64_dtype, "high": float64_dtype, "low": float64_dtype, "close": float64_dtype, "volume": float64_dtype, } def __init__(self, filename, calendar, start_session, end_session): self._filename = filename start_session = start_session.tz_localize(None) end_session = end_session.tz_localize(None) if start_session != end_session: if not calendar.is_session(start_session): raise ValueError("Start session %s is invalid!" % start_session) if not calendar.is_session(end_session): raise ValueError("End session %s is invalid!" % end_session) self._start_session = start_session self._end_session = end_session self._calendar = calendar @property def progress_bar_message(self): return "Merging daily equity files:" def progress_bar_item_show_func(self, value): return value if value is None else str(value[0])
[docs] def write( self, data, assets=None, show_progress=False, invalid_data_behavior="warn" ): """ Parameters ---------- data : iterable[tuple[int, pandas.DataFrame or bcolz.ctable]] The data chunks to write. Each chunk should be a tuple of sid and the data for that asset. assets : set[int], optional The assets that should be in ``data``. If this is provided we will check ``data`` against the assets and provide better progress information. show_progress : bool, optional Whether or not to show a progress bar while writing. invalid_data_behavior : {'warn', 'raise', 'ignore'}, optional What to do when data is encountered that is outside the range of a uint32. Returns ------- table : bcolz.ctable The newly-written table. """ ctx = maybe_show_progress( ((sid, self.to_ctable(df, invalid_data_behavior)) for sid, df in data), show_progress=show_progress, item_show_func=self.progress_bar_item_show_func, label=self.progress_bar_message, length=len(assets) if assets is not None else None, ) with ctx as it: return self._write_internal(it, assets)
[docs] def write_csvs(self, asset_map, show_progress=False, invalid_data_behavior="warn"): """Read CSVs as DataFrames from our asset map. Parameters ---------- asset_map : dict[int -> str] A mapping from asset id to file path with the CSV data for that asset show_progress : bool Whether or not to show a progress bar while writing. invalid_data_behavior : {'warn', 'raise', 'ignore'} What to do when data is encountered that is outside the range of a uint32. """ read = partial( pd.read_csv, parse_dates=["day"], index_col="day", dtype=self._csv_dtypes, ) return self.write( ((asset, read(path)) for asset, path in asset_map.items()), assets=asset_map.keys(), show_progress=show_progress, invalid_data_behavior=invalid_data_behavior, )
def _write_internal(self, iterator, assets): """Internal implementation of write. `iterator` should be an iterator yielding pairs of (asset, ctable). """ total_rows = 0 first_row = {} last_row = {} calendar_offset = {} # Maps column name -> output carray. columns = { k: carray(np.array([], dtype=uint32_dtype)) for k in US_EQUITY_PRICING_BCOLZ_COLUMNS } earliest_date = None sessions = self._calendar.sessions_in_range( self._start_session, self._end_session ) if assets is not None: @apply def iterator(iterator=iterator, assets=set(assets)): for asset_id, table in iterator: if asset_id not in assets: raise ValueError("unknown asset id %r" % asset_id) yield asset_id, table for asset_id, table in iterator: nrows = len(table) for column_name in columns: if column_name == "id": # We know what the content of this column is, so don't # bother reading it. columns["id"].append( np.full((nrows,), asset_id, dtype="uint32"), ) continue columns[column_name].append(table[column_name]) if earliest_date is None: earliest_date = table["day"][0] else: earliest_date = min(earliest_date, table["day"][0]) # Bcolz doesn't support ints as keys in `attrs`, so convert # assets to strings for use as attr keys. asset_key = str(asset_id) # Calculate the index into the array of the first and last row # for this asset. This allows us to efficiently load single # assets when querying the data back out of the table. first_row[asset_key] = total_rows last_row[asset_key] = total_rows + nrows - 1 total_rows += nrows asset_first_day = pd.Timestamp(table["day"][0], unit="s").normalize() asset_last_day = pd.Timestamp(table["day"][-1], unit="s").normalize() asset_sessions = sessions[ sessions.slice_indexer(asset_first_day, asset_last_day) ] if len(table) != len(asset_sessions): missing_sessions = asset_sessions.difference( pd.to_datetime(np.array(table["day"]), unit="s") ).tolist() extra_sessions = ( pd.to_datetime(np.array(table["day"]), unit="s") .difference(asset_sessions) .tolist() ) raise AssertionError( f"Got {len(table)} rows for daily bars table with " f"first day={asset_first_day.date()}, last " f"day={asset_last_day.date()}, expected {len(asset_sessions)} rows.\n" f"Missing sessions: {missing_sessions}\nExtra sessions: {extra_sessions}" ) # assert len(table) == len(asset_sessions), ( # Calculate the number of trading days between the first date # in the stored data and the first date of **this** asset. This # offset used for output alignment by the reader. calendar_offset[asset_key] = sessions.get_loc(asset_first_day) # This writes the table to disk. full_table = ctable( columns=[columns[colname] for colname in US_EQUITY_PRICING_BCOLZ_COLUMNS], names=US_EQUITY_PRICING_BCOLZ_COLUMNS, rootdir=self._filename, mode="w", ) full_table.attrs["first_trading_day"] = ( earliest_date if earliest_date is not None else iNaT ) full_table.attrs["first_row"] = first_row full_table.attrs["last_row"] = last_row full_table.attrs["calendar_offset"] = calendar_offset full_table.attrs["calendar_name"] = self._calendar.name full_table.attrs["start_session_ns"] = self._start_session.value full_table.attrs["end_session_ns"] = self._end_session.value full_table.flush() return full_table @expect_element(invalid_data_behavior={"warn", "raise", "ignore"}) def to_ctable(self, raw_data, invalid_data_behavior): if isinstance(raw_data, ctable): # we already have a ctable so do nothing return raw_data winsorise_uint32(raw_data, invalid_data_behavior, "volume", *OHLC) processed = (raw_data[list(OHLC)] * 1000).round().astype("uint32") dates = raw_data.index.values.astype("datetime64[s]") check_uint32_safe(dates.max().view(np.int64), "day") processed["day"] = dates.astype("uint32") processed["volume"] = raw_data.volume.astype("uint32") return ctable.fromdataframe(processed)
[docs]class BcolzDailyBarReader(CurrencyAwareSessionBarReader): """Reader for raw pricing data written by BcolzDailyOHLCVWriter. Parameters ---------- table : bcolz.ctable The ctable contaning the pricing data, with attrs corresponding to the Attributes list below. read_all_threshold : int The number of equities at which; below, the data is read by reading a slice from the carray per asset. above, the data is read by pulling all of the data for all assets into memory and then indexing into that array for each day and asset pair. Used to tune performance of reads when using a small or large number of equities. Attributes ---------- The table with which this loader interacts contains the following attributes: first_row : dict Map from asset_id -> index of first row in the dataset with that id. last_row : dict Map from asset_id -> index of last row in the dataset with that id. calendar_offset : dict Map from asset_id -> calendar index of first row. start_session_ns: int Epoch ns of the first session used in this dataset. end_session_ns: int Epoch ns of the last session used in this dataset. calendar_name: str String identifier of trading calendar used (ie, "NYSE"). We use first_row and last_row together to quickly find ranges of rows to load when reading an asset's data into memory. We use calendar_offset and calendar to orient loaded blocks within a range of queried dates. Notes ------ A Bcolz CTable is comprised of Columns and Attributes. The table with which this loader interacts contains the following columns: ['open', 'high', 'low', 'close', 'volume', 'day', 'id']. The data in these columns is interpreted as follows: - Price columns ('open', 'high', 'low', 'close') are interpreted as 1000 * as-traded dollar value. - Volume is interpreted as as-traded volume. - Day is interpreted as seconds since midnight UTC, Jan 1, 1970. - Id is the asset id of the row. The data in each column is grouped by asset and then sorted by day within each asset block. The table is built to represent a long time range of data, e.g. ten years of equity data, so the lengths of each asset block is not equal to each other. The blocks are clipped to the known start and end date of each asset to cut down on the number of empty values that would need to be included to make a regular/cubic dataset. When read across the open, high, low, close, and volume with the same index should represent the same asset and day. See Also -------- zipline.data.bcolz_daily_bars.BcolzDailyBarWriter """ def __init__(self, table, read_all_threshold=3000): self._maybe_table_rootdir = table # Cache of fully read np.array for the carrays in the daily bar table. # raw_array does not use the same cache, but it could. # Need to test keeping the entire array in memory for the course of a # process first. self._spot_cols = {} self.PRICE_ADJUSTMENT_FACTOR = 0.001 self._read_all_threshold = read_all_threshold @lazyval def _table(self): maybe_table_rootdir = self._maybe_table_rootdir if isinstance(maybe_table_rootdir, ctable): return maybe_table_rootdir return ctable(rootdir=maybe_table_rootdir, mode="r") @lazyval def sessions(self): if "calendar" in self._table.attrs.attrs: # backwards compatibility with old formats, will remove return pd.DatetimeIndex(self._table.attrs["calendar"]) else: cal = get_calendar(self._table.attrs["calendar_name"]) start_session_ns = self._table.attrs["start_session_ns"] start_session = pd.Timestamp(start_session_ns) end_session_ns = self._table.attrs["end_session_ns"] end_session = pd.Timestamp(end_session_ns) sessions = cal.sessions_in_range(start_session, end_session) return sessions @lazyval def _first_rows(self): return { int(asset_id): start_index for asset_id, start_index in self._table.attrs["first_row"].items() } @lazyval def _last_rows(self): return { int(asset_id): end_index for asset_id, end_index in self._table.attrs["last_row"].items() } @lazyval def _calendar_offsets(self): return { int(id_): offset for id_, offset in self._table.attrs["calendar_offset"].items() } @lazyval def first_trading_day(self): try: return pd.Timestamp(self._table.attrs["first_trading_day"], unit="s") except KeyError: return None @lazyval def trading_calendar(self): if "calendar_name" in self._table.attrs.attrs: return get_calendar(self._table.attrs["calendar_name"]) else: return None @property def last_available_dt(self): return self.sessions[-1] def _compute_slices(self, start_idx, end_idx, assets): """Compute the raw row indices to load for each asset on a query for the given dates after applying a shift. Parameters ---------- start_idx : int Index of first date for which we want data. end_idx : int Index of last date for which we want data. assets : pandas.Int64Index Assets for which we want to compute row indices Returns ------- A 3-tuple of (first_rows, last_rows, offsets): first_rows : np.array[intp] Array with length == len(assets) containing the index of the first row to load for each asset in `assets`. last_rows : np.array[intp] Array with length == len(assets) containing the index of the last row to load for each asset in `assets`. offset : np.array[intp] Array with length == (len(asset) containing the index in a buffer of length `dates` corresponding to the first row of each asset. The value of offset[i] will be 0 if asset[i] existed at the start of a query. Otherwise, offset[i] will be equal to the number of entries in `dates` for which the asset did not yet exist. """ # The core implementation of the logic here is implemented in Cython # for efficiency. return _compute_row_slices( self._first_rows, self._last_rows, self._calendar_offsets, start_idx, end_idx, assets, )
[docs] def load_raw_arrays(self, columns, start_date, end_date, assets): start_idx = self._load_raw_arrays_date_to_index(start_date) end_idx = self._load_raw_arrays_date_to_index(end_date) first_rows, last_rows, offsets = self._compute_slices( start_idx, end_idx, assets, ) read_all = len(assets) > self._read_all_threshold return _read_bcolz_data( self._table, (end_idx - start_idx + 1, len(assets)), list(columns), first_rows, last_rows, offsets, read_all, )
def _load_raw_arrays_date_to_index(self, date): try: # TODO get_loc is deprecated but get_indexer doesnt raise and error return self.sessions.get_loc(date) except KeyError as exc: raise NoDataOnDate(date) from exc def _spot_col(self, colname): """Get the colname from daily_bar_table and read all of it into memory, caching the result. Parameters ---------- colname : string A name of a OHLCV carray in the daily_bar_table Returns ------- array (uint32) Full read array of the carray in the daily_bar_table with the given colname. """ try: col = self._spot_cols[colname] except KeyError: col = self._spot_cols[colname] = self._table[colname] return col
[docs] def get_last_traded_dt(self, asset, day): volumes = self._spot_col("volume") search_day = day while True: try: ix = self.sid_day_index(asset, search_day) except NoDataBeforeDate: return pd.NaT except NoDataAfterDate: prev_day_ix = self.sessions.get_loc(search_day) - 1 if prev_day_ix > -1: search_day = self.sessions[prev_day_ix] continue except NoDataOnDate: return pd.NaT if volumes[ix] != 0: return search_day prev_day_ix = self.sessions.get_loc(search_day) - 1 if prev_day_ix > -1: search_day = self.sessions[prev_day_ix] else: return pd.NaT
[docs] def sid_day_index(self, sid, day): """ Parameters ---------- sid : int The asset identifier. day : datetime64-like Midnight of the day for which data is requested. Returns ------- int Index into the data tape for the given sid and day. Raises a NoDataOnDate exception if the given day and sid is before or after the date range of the equity. """ try: day_loc = self.sessions.get_loc(day) except Exception as exc: raise NoDataOnDate( "day={0} is outside of calendar={1}".format(day, self.sessions) ) from exc offset = day_loc - self._calendar_offsets[sid] if offset < 0: raise NoDataBeforeDate( "No data on or before day={0} for sid={1}".format(day, sid) ) ix = self._first_rows[sid] + offset if ix > self._last_rows[sid]: raise NoDataAfterDate( "No data on or after day={0} for sid={1}".format(day, sid) ) return ix
[docs] def get_value(self, sid, dt, field): """ Parameters ---------- sid : int The asset identifier. day : datetime64-like Midnight of the day for which data is requested. colname : string The price field. e.g. ('open', 'high', 'low', 'close', 'volume') Returns ------- float The spot price for colname of the given sid on the given day. Raises a NoDataOnDate exception if the given day and sid is before or after the date range of the equity. Returns -1 if the day is within the date range, but the price is 0. """ ix = self.sid_day_index(sid, dt) price = self._spot_col(field)[ix] if field != "volume": if price == 0: return np.nan else: return price * 0.001 else: return price
[docs] def currency_codes(self, sids): # XXX: This is pretty inefficient. This reader doesn't really support # country codes, so we always either return USD or None if we don't # know about the sid at all. first_rows = self._first_rows out = [] for sid in sids: if sid in first_rows: out.append("USD") else: out.append(None) return np.array(out, dtype=object)