Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return Tickfilter((2, 3, 67, 70), self)
def bidasks(self) -> "Tickfilter":
"""
Emit bid and ask ticks.
"""
return Tickfilter((0, 1, 66, 69, 2, 3, 67, 70), self)
def midpoints(self) -> "Tickfilter":
"""
Emit midpoint ticks.
"""
return Midpoints((), self)
class Tickfilter(Op):
"""
Tick filtering event operators that ``emit(time, price, size)``.
"""
__slots__ = ('_tickTypes',)
def __init__(self, tickTypes, source=None):
Op.__init__(self, source)
self._tickTypes = set(tickTypes)
def on_source(self, ticker):
for t in ticker.ticks:
if t.tickType in self._tickTypes:
self.emit(t.time, t.price, t.size)
def timebars(self, timer: Event) -> "TimeBars":
"""
def _on_timer(self, time):
if self.bars:
bar = self.bars[-1]
if isNan(bar.close) and len(self.bars) > 1:
bar.open = bar.high = bar.low = bar.close = \
self.bars[-2].close
self.bars.updateEvent.emit(self.bars, True)
self.emit(bar)
self.bars.append(Bar(time))
def _on_timer_done(self, timer):
self._timer = None
self.set_done()
class TickBars(Op):
__slots__ = ('_count', 'bars')
__doc__ = Tickfilter.tickbars.__doc__
def __init__(self, count, source=None):
Op.__init__(self, source)
self._count = count
self.bars: BarList = BarList()
def on_source(self, time, price, size):
if not self.bars or self.bars[-1].count == self._count:
bar = Bar(time, price, price, price, price, size, 1)
self.bars.append(bar)
else:
bar = self.bars[-1]
bar.high = max(bar.high, price)
bar.low = min(bar.low, price)
def _on_timer(self, time):
if self.bars:
bar = self.bars[-1]
if isNan(bar.close) and len(self.bars) > 1:
bar.open = bar.high = bar.low = bar.close = \
self.bars[-2].close
self.bars.updateEvent.emit(self.bars, True)
self.emit(bar)
self.bars.append(Bar(time))
def _on_timer_done(self, timer):
self._timer = None
self.set_done()
class TickBars(Op):
__slots__ = ('_count', 'bars')
__doc__ = Tickfilter.tickbars.__doc__
def __init__(self, count, source=None):
Op.__init__(self, source)
self._count = count
self.bars: BarList = BarList()
def on_source(self, time, price, size):
if not self.bars or self.bars[-1].count == self._count:
bar = Bar(time, price, price, price, price, size, 1)
self.bars.append(bar)
else:
bar = self.bars[-1]
bar.high = max(bar.high, price)
bar.low = min(bar.low, price)
return Tickfilter((2, 3, 67, 70), self)
def bidasks(self) -> "Tickfilter":
"""
Emit bid and ask ticks.
"""
return Tickfilter((0, 1, 66, 69, 2, 3, 67, 70), self)
def midpoints(self) -> "Tickfilter":
"""
Emit midpoint ticks.
"""
return Midpoints((), self)
class Tickfilter(Op):
"""
Tick filtering event operators that ``emit(time, price, size)``.
"""
__slots__ = ('_tickTypes',)
def __init__(self, tickTypes, source=None):
Op.__init__(self, source)
self._tickTypes = set(tickTypes)
def on_source(self, ticker):
for t in ticker.ticks:
if t.tickType in self._tickTypes:
self.emit(t.time, t.price, t.size)
def timebars(self, timer: Event) -> "TimeBars":
"""
class Bar(Object):
defaults = dict(
time=None,
open=nan,
high=nan,
low=nan,
close=nan,
volume=0,
count=0
)
__slots__ = defaults
class TimeBars(Op):
__slots__ = ('_timer', 'bars',)
__doc__ = Tickfilter.timebars.__doc__
def __init__(self, timer, source=None):
Op.__init__(self, source)
self._timer = timer
self._timer.connect(self._on_timer, None, self._on_timer_done)
self.bars: BarList = BarList()
def on_source(self, time, price, size):
if not self.bars:
return
bar = self.bars[-1]
if isNan(bar.open):
bar.open = bar.high = bar.low = price
bar.high = max(bar.high, price)
class Bar(Object):
defaults = dict(
time=None,
open=nan,
high=nan,
low=nan,
close=nan,
volume=0,
count=0
)
__slots__ = defaults
class TimeBars(Op):
__slots__ = ('_timer', 'bars',)
__doc__ = Tickfilter.timebars.__doc__
def __init__(self, timer, source=None):
Op.__init__(self, source)
self._timer = timer
self._timer.connect(self._on_timer, None, self._on_timer_done)
self.bars: BarList = BarList()
def on_source(self, time, price, size):
if not self.bars:
return
bar = self.bars[-1]
if isNan(bar.open):
bar.open = bar.high = bar.low = price
bar.high = max(bar.high, price)