Source code for zipline.pipeline.loaders.equity_pricing_loader

# Copyright 2015 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict

from interface import implements
from numpy import iinfo, uint32, multiply

from zipline.data.fx import ExplodingFXRateReader
from zipline.lib.adjusted_array import AdjustedArray
from zipline.utils.numpy_utils import repeat_first_axis

from .base import PipelineLoader
from .utils import shift_dates
from ..data.equity_pricing import EquityPricing

UINT32_MAX = iinfo(uint32).max


[docs]class EquityPricingLoader(implements(PipelineLoader)): """A PipelineLoader for loading daily OHLCV data. Parameters ---------- raw_price_reader : zipline.data.session_bars.SessionBarReader Reader providing raw prices. adjustments_reader : zipline.data.adjustments.SQLiteAdjustmentReader Reader providing price/volume adjustments. fx_reader : zipline.data.fx.FXRateReader Reader providing currency conversions. """
[docs] def __init__(self, raw_price_reader, adjustments_reader, fx_reader): self.raw_price_reader = raw_price_reader self.adjustments_reader = adjustments_reader self.fx_reader = fx_reader
@classmethod def without_fx(cls, raw_price_reader, adjustments_reader): """ Construct an EquityPricingLoader without support for fx rates. The returned loader will raise an error if requested to load currency-converted columns. Parameters ---------- raw_price_reader : zipline.data.session_bars.SessionBarReader Reader providing raw prices. adjustments_reader : zipline.data.adjustments.SQLiteAdjustmentReader Reader providing price/volume adjustments. Returns ------- loader : EquityPricingLoader A loader that can only provide currency-naive data. """ return cls( raw_price_reader=raw_price_reader, adjustments_reader=adjustments_reader, fx_reader=ExplodingFXRateReader(), ) def load_adjusted_array(self, domain, columns, dates, sids, mask): # load_adjusted_array is called with dates on which the user's algo # will be shown data, which means we need to return the data that would # be known at the **start** of each date. We assume that the latest # data known on day N is the data from day (N - 1), so we shift all # query dates back by a trading session. sessions = domain.sessions() shifted_dates = shift_dates(sessions, dates[0], dates[-1], shift=1) ohlcv_cols, currency_cols = self._split_column_types(columns) del columns # From here on we should use ohlcv_cols or currency_cols. ohlcv_colnames = [c.name for c in ohlcv_cols] raw_ohlcv_arrays = self.raw_price_reader.load_raw_arrays( ohlcv_colnames, shifted_dates[0], shifted_dates[-1], sids, ) # Currency convert raw_arrays in place if necessary. We use shifted # dates to load currency conversion rates to make them line up with # dates used to fetch prices. self._inplace_currency_convert( ohlcv_cols, raw_ohlcv_arrays, shifted_dates, sids, ) adjustments = self.adjustments_reader.load_pricing_adjustments( ohlcv_colnames, dates, sids, ) out = {} for c, c_raw, c_adjs in zip(ohlcv_cols, raw_ohlcv_arrays, adjustments): out[c] = AdjustedArray( c_raw.astype(c.dtype), c_adjs, c.missing_value, ) for c in currency_cols: codes_1d = self.raw_price_reader.currency_codes(sids) codes = repeat_first_axis(codes_1d, len(dates)) out[c] = AdjustedArray( codes, adjustments={}, missing_value=None, ) return out @property def currency_aware(self): # Tell the pipeline engine that this loader supports currency # conversion if we have a non-dummy fx rates reader. return not isinstance(self.fx_reader, ExplodingFXRateReader) def _inplace_currency_convert(self, columns, arrays, dates, sids): """ Currency convert raw data loaded for ``column``. Parameters ---------- columns : list[zipline.pipeline.data.BoundColumn] List of columns whose raw data has been loaded. arrays : list[np.array] List of arrays, parallel to ``columns`` containing data for the column. dates : pd.DatetimeIndex Labels for rows of ``arrays``. These are the dates that should be used to fetch fx rates for conversion. sids : np.array[int64] Labels for columns of ``arrays``. Returns ------- None Side Effects ------------ Modifies ``arrays`` in place by applying currency conversions. """ # Group columns by currency conversion spec. by_spec = defaultdict(list) for column, array in zip(columns, arrays): by_spec[column.currency_conversion].append(array) # Nothing to do for terms with no currency conversion. by_spec.pop(None, None) if not by_spec: return fx_reader = self.fx_reader base_currencies = self.raw_price_reader.currency_codes(sids) # Columns with the same conversion spec will use the same multipliers. for spec, arrays in by_spec.items(): rates = fx_reader.get_rates( rate=spec.field, quote=spec.currency.code, bases=base_currencies, dts=dates, ) for arr in arrays: multiply(arr, rates, out=arr) def _split_column_types(self, columns): """Split out currency columns from OHLCV columns. Parameters ---------- columns : list[zipline.pipeline.data.BoundColumn] Columns to be loaded by ``load_adjusted_array``. Returns ------- ohlcv_columns : list[zipline.pipeline.data.BoundColumn] Price and volume columns from ``columns``. currency_columns : list[zipline.pipeline.data.BoundColumn] Currency code column from ``columns``, if present. """ currency_name = EquityPricing.currency.name ohlcv = [] currency = [] for c in columns: if c.name == currency_name: currency.append(c) else: ohlcv.append(c) return ohlcv, currency
# Backwards compat alias. USEquityPricingLoader = EquityPricingLoader