#
# Copyright 2019 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABCMeta, ABC, abstractmethod
from collections import namedtuple
import inspect
import warnings
import datetime
import numpy as np
import pandas as pd
import pytz
from toolz import curry
from zipline.utils.input_validation import preprocess
from zipline.utils.memoize import lazyval
from zipline.utils.sentinel import sentinel
from .context_tricks import nop_context
__all__ = [
"EventManager",
"Event",
"EventRule",
"StatelessRule",
"ComposedRule",
"Always",
"Never",
"AfterOpen",
"BeforeClose",
"NotHalfDay",
"NthTradingDayOfWeek",
"NDaysBeforeLastTradingDayOfWeek",
"NthTradingDayOfMonth",
"NDaysBeforeLastTradingDayOfMonth",
"StatefulRule",
"OncePerDay",
# Factory API
"date_rules",
"time_rules",
"calendars",
"make_eventrule",
]
MAX_MONTH_RANGE = 23
MAX_WEEK_RANGE = 5
def ensure_utc(time, tz="UTC"):
"""Normalize a time. If the time is tz-naive, assume it is UTC."""
if not time.tzinfo:
time = time.replace(tzinfo=pytz.timezone(tz))
return time.replace(tzinfo=pytz.utc)
def _out_of_range_error(a, b=None, var="offset"):
start = 0
if b is None:
end = a - 1
else:
start = a
end = b - 1
return ValueError(
"{var} must be in between {start} and {end} inclusive".format(
var=var,
start=start,
end=end,
)
)
def _td_check(td):
seconds = td.total_seconds()
# 43200 seconds = 12 hours
if 60 <= seconds <= 43200:
return td
else:
raise ValueError(
"offset must be in between 1 minute and 12 hours, " "inclusive."
)
def _build_offset(offset, kwargs, default):
"""
Builds the offset argument for event rules.
"""
# Filter down to just kwargs that were actually passed.
kwargs = {k: v for k, v in kwargs.items() if v is not None}
if offset is None:
if not kwargs:
return default # use the default.
else:
return _td_check(datetime.timedelta(**kwargs))
elif kwargs:
raise ValueError("Cannot pass kwargs and an offset")
elif isinstance(offset, datetime.timedelta):
return _td_check(offset)
else:
raise TypeError("Must pass 'hours' and/or 'minutes' as keywords")
def _build_date(date, kwargs):
"""
Builds the date argument for event rules.
"""
if date is None:
if not kwargs:
raise ValueError("Must pass a date or kwargs")
else:
return datetime.date(**kwargs)
elif kwargs:
raise ValueError("Cannot pass kwargs and a date")
else:
return date
# TODO: only used in tests
# TODO FIX TZ
def _build_time(time, kwargs):
"""Builds the time argument for event rules."""
tz = kwargs.pop("tz", "UTC")
if time:
if kwargs:
raise ValueError("Cannot pass kwargs and a time")
else:
return ensure_utc(time, tz)
elif not kwargs:
raise ValueError("Must pass a time or kwargs")
else:
return datetime.time(**kwargs)
@curry
def lossless_float_to_int(funcname, func, argname, arg):
"""
A preprocessor that coerces integral floats to ints.
Receipt of non-integral floats raises a TypeError.
"""
if not isinstance(arg, float):
return arg
arg_as_int = int(arg)
if arg == arg_as_int:
warnings.warn(
"{f} expected an int for argument {name!r}, but got float {arg}."
" Coercing to int.".format(
f=funcname,
name=argname,
arg=arg,
),
)
return arg_as_int
raise TypeError(arg)
class EventManager:
"""Manages a list of Event objects.
This manages the logic for checking the rules and dispatching to the
handle_data function of the Events.
Parameters
----------
create_context : (BarData) -> context manager, optional
An optional callback to produce a context manager to wrap the calls
to handle_data. This will be passed the current BarData.
"""
def __init__(self, create_context=None):
self._events = []
self._create_context = (
create_context if create_context is not None else lambda *_: nop_context
)
def add_event(self, event, prepend=False):
"""
Adds an event to the manager.
"""
if prepend:
self._events.insert(0, event)
else:
self._events.append(event)
def handle_data(self, context, data, dt):
with self._create_context(data):
for event in self._events:
event.handle_data(
context,
data,
dt,
)
class Event(namedtuple("Event", ["rule", "callback"])):
"""
An event is a pairing of an EventRule and a callable that will be invoked
with the current algorithm context, data, and datetime only when the rule
is triggered.
"""
def __new__(cls, rule, callback=None):
callback = callback or (lambda *args, **kwargs: None)
return super(cls, cls).__new__(cls, rule=rule, callback=callback)
def handle_data(self, context, data, dt):
"""
Calls the callable only when the rule is triggered.
"""
if self.rule.should_trigger(dt):
self.callback(context, data)
class EventRule(ABC):
"""A rule defining when a scheduled function should execute."""
# Instances of EventRule are assigned a calendar instance when scheduling
# a function.
_cal = None
@property
def cal(self):
return self._cal
@cal.setter
def cal(self, value):
self._cal = value
@abstractmethod
def should_trigger(self, dt):
"""
Checks if the rule should trigger with its current state.
This method should be pure and NOT mutate any state on the object.
"""
raise NotImplementedError("should_trigger")
class StatelessRule(EventRule):
"""
A stateless rule has no observable side effects.
This is reentrant and will always give the same result for the
same datetime.
Because these are pure, they can be composed to create new rules.
"""
def and_(self, rule):
"""
Logical and of two rules, triggers only when both rules trigger.
This follows the short circuiting rules for normal and.
"""
return ComposedRule(self, rule, ComposedRule.lazy_and)
__and__ = and_
class ComposedRule(StatelessRule):
"""
A rule that composes the results of two rules with some composing function.
The composing function should be a binary function that accepts the results
first(dt) and second(dt) as positional arguments.
For example, operator.and_.
If lazy=True, then the lazy composer is used instead. The lazy composer
expects a function that takes the two should_trigger functions and the
datetime. This is useful of you don't always want to call should_trigger
for one of the rules. For example, this is used to implement the & and |
operators so that they will have the same short circuit logic that is
expected.
"""
def __init__(self, first, second, composer):
if not (isinstance(first, StatelessRule) and isinstance(second, StatelessRule)):
raise ValueError("Only two StatelessRules can be composed")
self.first = first
self.second = second
self.composer = composer
def should_trigger(self, dt):
"""
Composes the two rules with a lazy composer.
"""
return self.composer(self.first.should_trigger, self.second.should_trigger, dt)
@staticmethod
def lazy_and(first_should_trigger, second_should_trigger, dt):
"""
Lazily ands the two rules. This will NOT call the should_trigger of the
second rule if the first one returns False.
"""
return first_should_trigger(dt) and second_should_trigger(dt)
@property
def cal(self):
return self.first.cal
@cal.setter
def cal(self, value):
# Thread the calendar through to the underlying rules.
self.first.cal = self.second.cal = value
class Always(StatelessRule):
"""
A rule that always triggers.
"""
@staticmethod
def always_trigger(dt):
"""
A should_trigger implementation that will always trigger.
"""
return True
should_trigger = always_trigger
class Never(StatelessRule):
"""
A rule that never triggers.
"""
@staticmethod
def never_trigger(dt):
"""
A should_trigger implementation that will never trigger.
"""
return False
should_trigger = never_trigger
class AfterOpen(StatelessRule):
"""
A rule that triggers for some offset after the market opens.
Example that triggers after 30 minutes of the market opening:
>>> AfterOpen(minutes=30) # doctest: +ELLIPSIS
<zipline.utils.events.AfterOpen object at ...>
"""
def __init__(self, offset=None, **kwargs):
self.offset = _build_offset(
offset,
kwargs,
datetime.timedelta(minutes=1), # Defaults to the first minute.
)
self._period_start = None
self._period_end = None
self._period_close = None
self._one_minute = datetime.timedelta(minutes=1)
def calculate_dates(self, dt):
"""Given a date, find that day's open and period end (open + offset)."""
period_start = self.cal.session_first_minute(self.cal.minute_to_session(dt))
period_close = self.cal.session_close(self.cal.minute_to_session(dt))
# Align the market open and close times here with the execution times
# used by the simulation clock. This ensures that scheduled functions
# trigger at the correct times.
if self.cal.name == "us_futures":
self._period_start = self.cal.execution_time_from_open(period_start)
self._period_close = self.cal.execution_time_from_close(period_close)
else:
self._period_start = period_start
self._period_close = period_close
self._period_end = self._period_start + self.offset - self._one_minute
def should_trigger(self, dt):
# There are two reasons why we might want to recalculate the dates.
# One is the first time we ever call should_trigger, when
# self._period_start is none. The second is when we're on a new day,
# and need to recalculate the dates. For performance reasons, we rely
# on the fact that our clock only ever ticks forward, since it's
# cheaper to do dt1 <= dt2 than dt1.date() != dt2.date(). This means
# that we will NOT correctly recognize a new date if we go backwards
# in time(which should never happen in a simulation, or in live
# trading)
if self._period_start is None or self._period_close <= dt:
self.calculate_dates(dt)
return dt == self._period_end
class BeforeClose(StatelessRule):
"""A rule that triggers for some offset time before the market closes.
Example that triggers for the last 30 minutes every day:
>>> BeforeClose(minutes=30) # doctest: +ELLIPSIS
<zipline.utils.events.BeforeClose object at ...>
"""
def __init__(self, offset=None, **kwargs):
self.offset = _build_offset(
offset,
kwargs,
datetime.timedelta(minutes=1), # Defaults to the last minute.
)
self._period_start = None
self._period_close = None
self._period_end = None
self._one_minute = datetime.timedelta(minutes=1)
def calculate_dates(self, dt):
"""
Given a dt, find that day's close and period start (close - offset).
"""
period_end = self.cal.session_close(self.cal.minute_to_session(dt))
# Align the market close time here with the execution time used by the
# simulation clock. This ensures that scheduled functions trigger at
# the correct times.
if self.cal == "us_futures":
self._period_end = self.cal.execution_time_from_close(period_end)
else:
self._period_end = period_end
self._period_start = self._period_end - self.offset
self._period_close = self._period_end
def should_trigger(self, dt):
# There are two reasons why we might want to recalculate the dates.
# One is the first time we ever call should_trigger, when
# self._period_start is none. The second is when we're on a new day,
# and need to recalculate the dates. For performance reasons, we rely
# on the fact that our clock only ever ticks forward, since it's
# cheaper to do dt1 <= dt2 than dt1.date() != dt2.date(). This means
# that we will NOT correctly recognize a new date if we go backwards
# in time(which should never happen in a simulation, or in live
# trading)
if self._period_start is None or self._period_close <= dt:
self.calculate_dates(dt)
return self._period_start == dt
class NotHalfDay(StatelessRule):
"""
A rule that only triggers when it is not a half day.
"""
def should_trigger(self, dt):
return self.cal.minute_to_session(dt) not in self.cal.early_closes
class TradingDayOfWeekRule(StatelessRule, metaclass=ABCMeta):
@preprocess(n=lossless_float_to_int("TradingDayOfWeekRule"))
def __init__(self, n, invert):
if not 0 <= n < MAX_WEEK_RANGE:
raise _out_of_range_error(MAX_WEEK_RANGE)
self.td_delta = (-n - 1) if invert else n
def should_trigger(self, dt):
# is this market minute's period in the list of execution periods?
val = self.cal.minute_to_session(dt, direction="none").value
return val in self.execution_period_values
@lazyval
def execution_period_values(self):
# calculate the list of periods that match the given criteria
sessions = self.cal.sessions
return set(
pd.Series(data=sessions)
# Group by ISO year (0) and week (1)
.groupby(sessions.map(lambda x: x.isocalendar()[0:2]))
.nth(self.td_delta)
.view(np.int64)
)
class NthTradingDayOfWeek(TradingDayOfWeekRule):
"""
A rule that triggers on the nth trading day of the week.
This is zero-indexed, n=0 is the first trading day of the week.
"""
def __init__(self, n):
super(NthTradingDayOfWeek, self).__init__(n, invert=False)
class NDaysBeforeLastTradingDayOfWeek(TradingDayOfWeekRule):
"""
A rule that triggers n days before the last trading day of the week.
"""
def __init__(self, n):
super(NDaysBeforeLastTradingDayOfWeek, self).__init__(n, invert=True)
class TradingDayOfMonthRule(StatelessRule, metaclass=ABCMeta):
@preprocess(n=lossless_float_to_int("TradingDayOfMonthRule"))
def __init__(self, n, invert):
if not 0 <= n < MAX_MONTH_RANGE:
raise _out_of_range_error(MAX_MONTH_RANGE)
if invert:
self.td_delta = -n - 1
else:
self.td_delta = n
def should_trigger(self, dt):
# is this market minute's period in the list of execution periods?
value = self.cal.minute_to_session(dt, direction="none").value
return value in self.execution_period_values
@lazyval
def execution_period_values(self):
# calculate the list of periods that match the given criteria
sessions = self.cal.sessions
return set(
pd.Series(data=sessions)
.groupby([sessions.year, sessions.month])
.nth(self.td_delta)
.astype(np.int64)
)
class NthTradingDayOfMonth(TradingDayOfMonthRule):
"""
A rule that triggers on the nth trading day of the month.
This is zero-indexed, n=0 is the first trading day of the month.
"""
def __init__(self, n):
super(NthTradingDayOfMonth, self).__init__(n, invert=False)
class NDaysBeforeLastTradingDayOfMonth(TradingDayOfMonthRule):
"""
A rule that triggers n days before the last trading day of the month.
"""
def __init__(self, n):
super(NDaysBeforeLastTradingDayOfMonth, self).__init__(n, invert=True)
# Stateful rules
class StatefulRule(EventRule):
"""
A stateful rule has state.
This rule will give different results for the same datetimes depending
on the internal state that this holds.
StatefulRules wrap other rules as state transformers.
"""
def __init__(self, rule=None):
self.rule = rule or Always()
@property
def cal(self):
return self.rule.cal
@cal.setter
def cal(self, value):
# Thread the calendar through to the underlying rule.
self.rule.cal = value
class OncePerDay(StatefulRule):
def __init__(self, rule=None):
self.triggered = False
self.date = None
self.next_date = None
super(OncePerDay, self).__init__(rule)
def should_trigger(self, dt):
if self.date is None or dt >= self.next_date:
# initialize or reset for new date
self.triggered = False
self.date = dt
# record the timestamp for the next day, so that we can use it
# to know if we've moved to the next day
self.next_date = dt + pd.Timedelta(1, unit="d")
if not self.triggered and self.rule.should_trigger(dt):
self.triggered = True
return True
# Factory API
[docs]class date_rules:
"""Factories for date-based :func:`~zipline.api.schedule_function` rules.
See Also
--------
:func:`~zipline.api.schedule_function`
"""
[docs] @staticmethod
def every_day():
"""Create a rule that triggers every day.
Returns
-------
rule : zipline.utils.events.EventRule
"""
return Always()
[docs] @staticmethod
def month_start(days_offset=0):
"""
Create a rule that triggers a fixed number of trading days after the
start of each month.
Parameters
----------
days_offset : int, optional
Number of trading days to wait before triggering each
month. Default is 0, i.e., trigger on the first trading day of the
month.
Returns
-------
rule : zipline.utils.events.EventRule
"""
return NthTradingDayOfMonth(n=days_offset)
[docs] @staticmethod
def month_end(days_offset=0):
"""
Create a rule that triggers a fixed number of trading days before the
end of each month.
Parameters
----------
days_offset : int, optional
Number of trading days prior to month end to trigger. Default is 0,
i.e., trigger on the last day of the month.
Returns
-------
rule : zipline.utils.events.EventRule
"""
return NDaysBeforeLastTradingDayOfMonth(n=days_offset)
[docs] @staticmethod
def week_start(days_offset=0):
"""
Create a rule that triggers a fixed number of trading days after the
start of each week.
Parameters
----------
days_offset : int, optional
Number of trading days to wait before triggering each week. Default
is 0, i.e., trigger on the first trading day of the week.
"""
return NthTradingDayOfWeek(n=days_offset)
[docs] @staticmethod
def week_end(days_offset=0):
"""
Create a rule that triggers a fixed number of trading days before the
end of each week.
Parameters
----------
days_offset : int, optional
Number of trading days prior to week end to trigger. Default is 0,
i.e., trigger on the last trading day of the week.
"""
return NDaysBeforeLastTradingDayOfWeek(n=days_offset)
[docs]class time_rules:
"""Factories for time-based :func:`~zipline.api.schedule_function` rules.
See Also
--------
:func:`~zipline.api.schedule_function`
"""
[docs] @staticmethod
def market_open(offset=None, hours=None, minutes=None):
"""
Create a rule that triggers at a fixed offset from market open.
The offset can be specified either as a :class:`datetime.timedelta`, or
as a number of hours and minutes.
Parameters
----------
offset : datetime.timedelta, optional
If passed, the offset from market open at which to trigger. Must be
at least 1 minute.
hours : int, optional
If passed, number of hours to wait after market open.
minutes : int, optional
If passed, number of minutes to wait after market open.
Returns
-------
rule : zipline.utils.events.EventRule
Notes
-----
If no arguments are passed, the default offset is one minute after
market open.
If ``offset`` is passed, ``hours`` and ``minutes`` must not be
passed. Conversely, if either ``hours`` or ``minutes`` are passed,
``offset`` must not be passed.
"""
return AfterOpen(offset=offset, hours=hours, minutes=minutes)
[docs] @staticmethod
def market_close(offset=None, hours=None, minutes=None):
"""
Create a rule that triggers at a fixed offset from market close.
The offset can be specified either as a :class:`datetime.timedelta`, or
as a number of hours and minutes.
Parameters
----------
offset : datetime.timedelta, optional
If passed, the offset from market close at which to trigger. Must
be at least 1 minute.
hours : int, optional
If passed, number of hours to wait before market close.
minutes : int, optional
If passed, number of minutes to wait before market close.
Returns
-------
rule : zipline.utils.events.EventRule
Notes
-----
If no arguments are passed, the default offset is one minute before
market close.
If ``offset`` is passed, ``hours`` and ``minutes`` must not be
passed. Conversely, if either ``hours`` or ``minutes`` are passed,
``offset`` must not be passed.
"""
return BeforeClose(offset=offset, hours=hours, minutes=minutes)
every_minute = Always
class calendars:
US_EQUITIES = sentinel("US_EQUITIES")
US_FUTURES = sentinel("US_FUTURES")
def _invert(d):
return dict(zip(d.values(), d.keys()))
_uncalled_rules = _invert(vars(date_rules))
_uncalled_rules.update(_invert(vars(time_rules)))
def _check_if_not_called(v):
try:
name = _uncalled_rules[v]
except KeyError:
if not (inspect.isclass(v) and issubclass(v, EventRule)):
return
name = getattr(v, "__name__", None)
msg = "invalid rule: %r" % (v,)
if name is not None:
msg += " (hint: did you mean %s())" % name
raise TypeError(msg)
def make_eventrule(date_rule, time_rule, cal, half_days=True):
"""
Constructs an event rule from the factory api.
"""
_check_if_not_called(date_rule)
_check_if_not_called(time_rule)
if half_days:
inner_rule = date_rule & time_rule
else:
inner_rule = date_rule & time_rule & NotHalfDay()
opd = OncePerDay(rule=inner_rule)
# This is where a scheduled function's rule is associated with a calendar.
opd.cal = cal
return opd