Source code for zipline.finance.metrics.metric

#
# Copyright 2018 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from functools import partial
import operator as op

from dateutil.relativedelta import relativedelta
import empyrical as ep
import numpy as np
import pandas as pd

from zipline.utils.exploding_object import NamedExplodingObject
from zipline.finance._finance_ext import minute_annual_volatility


[docs]class SimpleLedgerField: """Emit the current value of a ledger field every bar or every session. Parameters ---------- ledger_field : str The ledger field to read. packet_field : str, optional The name of the field to populate in the packet. If not provided, ``ledger_field`` will be used. """ def __init__(self, ledger_field, packet_field=None): self._get_ledger_field = op.attrgetter(ledger_field) if packet_field is None: self._packet_field = ledger_field.rsplit(".", 1)[-1] else: self._packet_field = packet_field def end_of_bar(self, packet, ledger, dt, session_ix, data_portal): packet["minute_perf"][self._packet_field] = self._get_ledger_field( ledger, ) def end_of_session(self, packet, ledger, session, session_ix, data_portal): packet["daily_perf"][self._packet_field] = self._get_ledger_field( ledger, )
[docs]class DailyLedgerField: """Like :class:`~zipline.finance.metrics.metric.SimpleLedgerField` but also puts the current value in the ``cumulative_perf`` section. Parameters ---------- ledger_field : str The ledger field to read. packet_field : str, optional The name of the field to populate in the packet. If not provided, ``ledger_field`` will be used. """ def __init__(self, ledger_field, packet_field=None): self._get_ledger_field = op.attrgetter(ledger_field) if packet_field is None: self._packet_field = ledger_field.rsplit(".", 1)[-1] else: self._packet_field = packet_field def end_of_bar(self, packet, ledger, dt, session_ix, data_portal): field = self._packet_field packet["cumulative_perf"][field] = packet["minute_perf"][ field ] = self._get_ledger_field(ledger) def end_of_session(self, packet, ledger, session, session_ix, data_portal): field = self._packet_field packet["cumulative_perf"][field] = packet["daily_perf"][ field ] = self._get_ledger_field(ledger)
[docs]class StartOfPeriodLedgerField: """Keep track of the value of a ledger field at the start of the period. Parameters ---------- ledger_field : str The ledger field to read. packet_field : str, optional The name of the field to populate in the packet. If not provided, ``ledger_field`` will be used. """ def __init__(self, ledger_field, packet_field=None): self._get_ledger_field = op.attrgetter(ledger_field) if packet_field is None: self._packet_field = ledger_field.rsplit(".", 1)[-1] else: self._packet_field = packet_field def start_of_simulation( self, ledger, emission_rate, trading_calendar, sessions, benchmark_source ): self._start_of_simulation = self._get_ledger_field(ledger) def start_of_session(self, ledger, session, data_portal): self._previous_day = self._get_ledger_field(ledger) def _end_of_period(self, sub_field, packet, ledger): packet_field = self._packet_field packet["cumulative_perf"][packet_field] = self._start_of_simulation packet[sub_field][packet_field] = self._previous_day def end_of_bar(self, packet, ledger, dt, session_ix, data_portal): self._end_of_period("minute_perf", packet, ledger) def end_of_session(self, packet, ledger, session, session_ix, data_portal): self._end_of_period("daily_perf", packet, ledger)
[docs]class Returns: """Tracks the daily and cumulative returns of the algorithm.""" def _end_of_period(field, packet, ledger, dt, session_ix, data_portal): packet[field]["returns"] = ledger.todays_returns packet["cumulative_perf"]["returns"] = ledger.portfolio.returns packet["cumulative_risk_metrics"][ "algorithm_period_return" ] = ledger.portfolio.returns end_of_bar = partial(_end_of_period, "minute_perf") end_of_session = partial(_end_of_period, "daily_perf")
[docs]class BenchmarkReturnsAndVolatility: """Tracks daily and cumulative returns for the benchmark as well as the volatility of the benchmark returns. """ def start_of_simulation( self, ledger, emission_rate, trading_calendar, sessions, benchmark_source ): daily_returns_series = benchmark_source.daily_returns( sessions[0], sessions[-1], ) self._daily_returns = daily_returns_array = daily_returns_series.values self._daily_cumulative_returns = np.cumprod(1 + daily_returns_array) - 1 self._daily_annual_volatility = ( daily_returns_series.expanding(2).std(ddof=1) * np.sqrt(252) ).values if emission_rate == "daily": self._minute_cumulative_returns = NamedExplodingObject( "self._minute_cumulative_returns", "does not exist in daily emission rate", ) self._minute_annual_volatility = NamedExplodingObject( "self._minute_annual_volatility", "does not exist in daily emission rate", ) else: open_ = trading_calendar.session_open(sessions[0]) close = trading_calendar.session_close(sessions[-1]) returns = benchmark_source.get_range(open_, close) self._minute_cumulative_returns = (1 + returns).cumprod() - 1 self._minute_annual_volatility = pd.Series( minute_annual_volatility( returns.index.normalize().view("int64"), returns.values, daily_returns_array, ), index=returns.index, ) def end_of_bar(self, packet, ledger, dt, session_ix, data_portal): r = self._minute_cumulative_returns[dt] if np.isnan(r): r = None packet["cumulative_risk_metrics"]["benchmark_period_return"] = r v = self._minute_annual_volatility[dt] if np.isnan(v): v = None packet["cumulative_risk_metrics"]["benchmark_volatility"] = v def end_of_session(self, packet, ledger, session, session_ix, data_portal): r = self._daily_cumulative_returns[session_ix] if np.isnan(r): r = None packet["cumulative_risk_metrics"]["benchmark_period_return"] = r v = self._daily_annual_volatility[session_ix] if np.isnan(v): v = None packet["cumulative_risk_metrics"]["benchmark_volatility"] = v
class PNL: """Tracks daily and cumulative PNL.""" def start_of_simulation( self, ledger, emission_rate, trading_calendar, sessions, benchmark_source ): self._previous_pnl = 0.0 def start_of_session(self, ledger, session, data_portal): self._previous_pnl = ledger.portfolio.pnl def _end_of_period(self, field, packet, ledger): pnl = ledger.portfolio.pnl packet[field]["pnl"] = pnl - self._previous_pnl packet["cumulative_perf"]["pnl"] = ledger.portfolio.pnl def end_of_bar(self, packet, ledger, dt, session_ix, data_portal): self._end_of_period("minute_perf", packet, ledger) def end_of_session(self, packet, ledger, session, session_ix, data_portal): self._end_of_period("daily_perf", packet, ledger)
[docs]class CashFlow: """Tracks daily and cumulative cash flow. Notes ----- For historical reasons, this field is named 'capital_used' in the packets. """ def start_of_simulation( self, ledger, emission_rate, trading_calendar, sessions, benchmark_source ): self._previous_cash_flow = 0.0 def end_of_bar(self, packet, ledger, dt, session_ix, data_portal): cash_flow = ledger.portfolio.cash_flow packet["minute_perf"]["capital_used"] = cash_flow - self._previous_cash_flow packet["cumulative_perf"]["capital_used"] = cash_flow def end_of_session(self, packet, ledger, session, session_ix, data_portal): cash_flow = ledger.portfolio.cash_flow packet["daily_perf"]["capital_used"] = cash_flow - self._previous_cash_flow packet["cumulative_perf"]["capital_used"] = cash_flow self._previous_cash_flow = cash_flow
[docs]class Orders: """Tracks daily orders.""" def end_of_bar(self, packet, ledger, dt, session_ix, data_portal): packet["minute_perf"]["orders"] = ledger.orders(dt) def end_of_session(self, packet, ledger, dt, session_ix, data_portal): packet["daily_perf"]["orders"] = ledger.orders()
[docs]class Transactions: """Tracks daily transactions.""" def end_of_bar(self, packet, ledger, dt, session_ix, data_portal): packet["minute_perf"]["transactions"] = ledger.transactions(dt) def end_of_session(self, packet, ledger, dt, session_ix, data_portal): packet["daily_perf"]["transactions"] = ledger.transactions()
[docs]class Positions: """Tracks daily positions.""" def end_of_bar(self, packet, ledger, dt, session_ix, data_portal): packet["minute_perf"]["positions"] = ledger.positions(dt) def end_of_session(self, packet, ledger, dt, session_ix, data_portal): packet["daily_perf"]["positions"] = ledger.positions()
[docs]class ReturnsStatistic: """A metric that reports an end of simulation scalar or time series computed from the algorithm returns. Parameters ---------- function : callable The function to call on the daily returns. field_name : str, optional The name of the field. If not provided, it will be ``function.__name__``. """ def __init__(self, function, field_name=None): if field_name is None: field_name = function.__name__ self._function = function self._field_name = field_name def end_of_bar(self, packet, ledger, dt, session_ix, data_portal): res = self._function(ledger.daily_returns_array[: session_ix + 1]) if not np.isfinite(res): res = None packet["cumulative_risk_metrics"][self._field_name] = res end_of_session = end_of_bar
[docs]class AlphaBeta: """End of simulation alpha and beta to the benchmark.""" def start_of_simulation( self, ledger, emission_rate, trading_calendar, sessions, benchmark_source ): self._daily_returns_array = benchmark_source.daily_returns( sessions[0], sessions[-1], ).values def end_of_bar(self, packet, ledger, dt, session_ix, data_portal): risk = packet["cumulative_risk_metrics"] alpha, beta = ep.alpha_beta_aligned( ledger.daily_returns_array[: session_ix + 1], self._daily_returns_array[: session_ix + 1], ) if not np.isfinite(alpha): alpha = None if np.isnan(beta): beta = None risk["alpha"] = alpha risk["beta"] = beta end_of_session = end_of_bar
[docs]class MaxLeverage: """Tracks the maximum account leverage.""" def start_of_simulation(self, *args): self._max_leverage = 0.0 def end_of_bar(self, packet, ledger, dt, session_ix, data_portal): self._max_leverage = max(self._max_leverage, ledger.account.leverage) packet["cumulative_risk_metrics"]["max_leverage"] = self._max_leverage end_of_session = end_of_bar
class NumTradingDays: """Report the number of trading days.""" def start_of_simulation(self, *args): self._num_trading_days = 0 def start_of_session(self, *args): self._num_trading_days += 1 def end_of_bar(self, packet, ledger, dt, session_ix, data_portal): packet["cumulative_risk_metrics"]["trading_days"] = self._num_trading_days end_of_session = end_of_bar class _ConstantCumulativeRiskMetric: """A metric which does not change, ever. Notes ----- This exists to maintain the existing structure of the perf packets. We should kill this as soon as possible. """ def __init__(self, field, value): self._field = field self._value = value def end_of_bar(self, packet, *args): packet["cumulative_risk_metrics"][self._field] = self._value def end_of_session(self, packet, *args): packet["cumulative_risk_metrics"][self._field] = self._value class PeriodLabel: """Backwards compat, please kill me.""" def start_of_session(self, ledger, session, data_portal): self._label = session.strftime("%Y-%m") def end_of_bar(self, packet, *args): packet["cumulative_risk_metrics"]["period_label"] = self._label end_of_session = end_of_bar class _ClassicRiskMetrics: """Produces original risk packet.""" def start_of_simulation( self, ledger, emission_rate, trading_calendar, sessions, benchmark_source ): self._leverages = np.full_like(sessions, np.nan, dtype="float64") def end_of_session(self, packet, ledger, dt, session_ix, data_portal): self._leverages[session_ix] = ledger.account.leverage @classmethod def risk_metric_period( cls, start_session, end_session, algorithm_returns, benchmark_returns, algorithm_leverages, ): """ Creates a dictionary representing the state of the risk report. Parameters ---------- start_session : pd.Timestamp Start of period (inclusive) to produce metrics on end_session : pd.Timestamp End of period (inclusive) to produce metrics on algorithm_returns : pd.Series(pd.Timestamp -> float) Series of algorithm returns as of the end of each session benchmark_returns : pd.Series(pd.Timestamp -> float) Series of benchmark returns as of the end of each session algorithm_leverages : pd.Series(pd.Timestamp -> float) Series of algorithm leverages as of the end of each session Returns ------- risk_metric : dict[str, any] Dict of metrics that with fields like: { 'algorithm_period_return': 0.0, 'benchmark_period_return': 0.0, 'treasury_period_return': 0, 'excess_return': 0.0, 'alpha': 0.0, 'beta': 0.0, 'sharpe': 0.0, 'sortino': 0.0, 'period_label': '1970-01', 'trading_days': 0, 'algo_volatility': 0.0, 'benchmark_volatility': 0.0, 'max_drawdown': 0.0, 'max_leverage': 0.0, } """ algorithm_returns = algorithm_returns[ (algorithm_returns.index >= start_session) & (algorithm_returns.index <= end_session) ] # Benchmark needs to be masked to the same dates as the algo returns benchmark_ret_tzinfo = benchmark_returns.index.tzinfo benchmark_returns = benchmark_returns[ (benchmark_returns.index >= start_session.tz_localize(benchmark_ret_tzinfo)) & ( benchmark_returns.index <= algorithm_returns.index[-1].tz_localize(benchmark_ret_tzinfo) ) ] benchmark_period_returns = ep.cum_returns(benchmark_returns).iloc[-1] algorithm_period_returns = ep.cum_returns(algorithm_returns).iloc[-1] alpha, beta = ep.alpha_beta_aligned( algorithm_returns.values, benchmark_returns.values, ) benchmark_volatility = ep.annual_volatility(benchmark_returns) sharpe = ep.sharpe_ratio(algorithm_returns) # The consumer currently expects a 0.0 value for sharpe in period, # this differs from cumulative which was np.nan. # When factoring out the sharpe_ratio, the different return types # were collapsed into `np.nan`. # TODO: Either fix consumer to accept `np.nan` or make the # `sharpe_ratio` return type configurable. # In the meantime, convert nan values to 0.0 if pd.isnull(sharpe): sharpe = 0.0 sortino = ep.sortino_ratio( algorithm_returns.values, _downside_risk=ep.downside_risk(algorithm_returns.values), ) rval = { "algorithm_period_return": algorithm_period_returns, "benchmark_period_return": benchmark_period_returns, "treasury_period_return": 0, "excess_return": algorithm_period_returns, "alpha": alpha, "beta": beta, "sharpe": sharpe, "sortino": sortino, "period_label": end_session.strftime("%Y-%m"), "trading_days": len(benchmark_returns), "algo_volatility": ep.annual_volatility(algorithm_returns), "benchmark_volatility": benchmark_volatility, "max_drawdown": ep.max_drawdown(algorithm_returns.values), "max_leverage": algorithm_leverages.max(), } # check if a field in rval is nan or inf, and replace it with None # except period_label which is always a str return { k: (None if k != "period_label" and not np.isfinite(v) else v) for k, v in rval.items() } @classmethod def _periods_in_range( cls, months, end_session, end_date, algorithm_returns, benchmark_returns, algorithm_leverages, months_per, ): if months.size < months_per: return tzinfo = end_date.tzinfo end_date = end_date for period_timestamp in months: period = period_timestamp.tz_localize(None).to_period( freq="%dM" % months_per ) if period.end_time > end_date: break yield cls.risk_metric_period( start_session=period.start_time.tz_localize(tzinfo), end_session=min(period.end_time, end_session).tz_localize(tzinfo), algorithm_returns=algorithm_returns, benchmark_returns=benchmark_returns, algorithm_leverages=algorithm_leverages, ) @classmethod def risk_report(cls, algorithm_returns, benchmark_returns, algorithm_leverages): start_session = algorithm_returns.index[0] end_session = algorithm_returns.index[-1] end = end_session.replace(day=1) + relativedelta(months=1) months = pd.date_range( start=start_session, # Ensure we have at least one month end=end - datetime.timedelta(days=1), freq="M", tz="utc", ) periods_in_range = partial( cls._periods_in_range, months=months, end_session=end_session, end_date=end, algorithm_returns=algorithm_returns, benchmark_returns=benchmark_returns, algorithm_leverages=algorithm_leverages, ) return { "one_month": list(periods_in_range(months_per=1)), "three_month": list(periods_in_range(months_per=3)), "six_month": list(periods_in_range(months_per=6)), "twelve_month": list(periods_in_range(months_per=12)), } def end_of_simulation( self, packet, ledger, trading_calendar, sessions, data_portal, benchmark_source ): packet.update( self.risk_report( algorithm_returns=ledger.daily_returns_series, benchmark_returns=benchmark_source.daily_returns( sessions[0], sessions[-1], ), algorithm_leverages=self._leverages, ) )