Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
month = int(dt.month - 1 + real_months)
year = int(dt.year + month // 12) # Integer division
month = int(month % 12 + 1)
day = min(dt.day, calendar.monthrange(year, month)[1]) # Avoid day 30 in February
dt = dt.replace(year=year, month=month, day=day)
# Subtract the approximate number seconds in the number of months
secs = secs % (real_months * SECONDS[1][1])
except (OverflowError, ValueError):
pass
else:
dt = dt + datetime.timedelta(seconds=secs)
ts = Timestamp(dt.timestamp())
if ts is None:
# Without years or months given, we can simply add our delta to time.time()
ts = Timestamp(time.time() + self)
return ts
`which` must be 'Scrape' or 'Announce'.
transmission.h says:
/* when the next periodic (announce|scrape) message will be sent out.
if (announce|scrape)State isn't TR_TRACKER_WAITING, this field is undefined */
"""
state = tracker['%sState' % which.lower()]
if state == 1: # TR_TRACKER_WAITING = 1
return tracker['next%sTime' % which]
elif state == 0: # Torrent is paused
return ttypes.Timestamp.NOT_APPLICABLE
elif state == 2: # Announce/scrape is queued
return ttypes.Timestamp.SOON
else:
return ttypes.Timestamp.NOW
# When adding/subtracting months, we may have to keep the
# month between 1-12. We also may have to adjust the year
# and/or day.
# https://stackoverflow.com/a/4131114
month = int(dt.month - 1 + real_months)
year = int(dt.year + month // 12) # Integer division
month = int(month % 12 + 1)
day = min(dt.day, calendar.monthrange(year, month)[1]) # Avoid day 30 in February
dt = dt.replace(year=year, month=month, day=day)
# Subtract the approximate number seconds in the number of months
secs = secs % (real_months * SECONDS[1][1])
except (OverflowError, ValueError):
pass
else:
dt = dt + datetime.timedelta(seconds=secs)
ts = Timestamp(dt.timestamp())
if ts is None:
# Without years or months given, we can simply add our delta to time.time()
ts = Timestamp(time.time() + self)
return ts
"""
Handle next(Announce|Scrape)Time RPC key
`which` must be 'Scrape' or 'Announce'.
transmission.h says:
/* when the next periodic (announce|scrape) message will be sent out.
if (announce|scrape)State isn't TR_TRACKER_WAITING, this field is undefined */
"""
state = tracker['%sState' % which.lower()]
if state == 1: # TR_TRACKER_WAITING = 1
return tracker['next%sTime' % which]
elif state == 0: # Torrent is paused
return ttypes.Timestamp.NOT_APPLICABLE
elif state == 2: # Announce/scrape is queued
return ttypes.Timestamp.SOON
else:
return ttypes.Timestamp.NOW
def _next_time(tracker, which):
"""
Handle next(Announce|Scrape)Time RPC key
`which` must be 'Scrape' or 'Announce'.
transmission.h says:
/* when the next periodic (announce|scrape) message will be sent out.
if (announce|scrape)State isn't TR_TRACKER_WAITING, this field is undefined */
"""
state = tracker['%sState' % which.lower()]
if state == 1: # TR_TRACKER_WAITING = 1
return tracker['next%sTime' % which]
elif state == 0: # Torrent is paused
return ttypes.Timestamp.NOT_APPLICABLE
elif state == 2: # Announce/scrape is queued
return ttypes.Timestamp.SOON
else:
return ttypes.Timestamp.NOW
def check_timestamp_filter(self, filter_cls, filter_names, key, default_sign):
from stig.client.ttypes import Timestamp
items = ({'id': 1, key: Timestamp.from_string('2001-01-01')},
{'id': 2, key: Timestamp.from_string('2002-01-01')},
{'id': 3, key: Timestamp.from_string('2003-01-01')},
{'id': 4, key: Timestamp(Timestamp.NOW)},
{'id': 5, key: Timestamp(Timestamp.UNKNOWN)},
{'id': 6, key: Timestamp(Timestamp.NOT_APPLICABLE)},
{'id': 7, key: Timestamp(Timestamp.NEVER)},
{'id': 8, key: Timestamp(Timestamp.SOON)})
for fn in filter_names:
self._check_timestamp_as_bool(filter_cls, fn, key, items)
self._check_timestamp_with_absolute_times(filter_cls, fn, key, items)
self._check_timestamp_with_relative_times(filter_cls, fn, key, items)
if default_sign == 1:
self._check_timestamp_with_positive_default_sign(filter_cls, fn, key, items)
elif default_sign == -1:
def check_timestamp_filter(self, filter_cls, filter_names, key, default_sign):
from stig.client.ttypes import Timestamp
items = ({'id': 1, key: Timestamp.from_string('2001-01-01')},
{'id': 2, key: Timestamp.from_string('2002-01-01')},
{'id': 3, key: Timestamp.from_string('2003-01-01')},
{'id': 4, key: Timestamp(Timestamp.NOW)},
{'id': 5, key: Timestamp(Timestamp.UNKNOWN)},
{'id': 6, key: Timestamp(Timestamp.NOT_APPLICABLE)},
{'id': 7, key: Timestamp(Timestamp.NEVER)},
{'id': 8, key: Timestamp(Timestamp.SOON)})
for fn in filter_names:
self._check_timestamp_as_bool(filter_cls, fn, key, items)
self._check_timestamp_with_absolute_times(filter_cls, fn, key, items)
self._check_timestamp_with_relative_times(filter_cls, fn, key, items)
if default_sign == 1:
self._check_timestamp_with_positive_default_sign(filter_cls, fn, key, items)
elif default_sign == -1:
self._check_timestamp_with_negative_default_sign(filter_cls, fn, key, items)
else:
raise RuntimeError('Invalid default_sign: %r' % (default_sign,))
header = {'left': 'Up', 'right': '?/s'}
width = 6
min_width = 6
needed_keys = ('rate-up',)
def get_value(self):
return self._from_cache(_ensure_hide_unit, self.data['rate-up'])
@classmethod
def set_unit(cls, unit):
cls.header['right'] = '%s/s' % unit
COLUMNS['rate-up'] = RateUp
class RateDown(ColumnBase):
header = {'left': 'Dn', 'right': '?/s'}
width = 6
min_width = 6
needed_keys = ('rate-down',)
def get_value(self):
return self._from_cache(_ensure_hide_unit, self.data['rate-down'])
@classmethod
def set_unit(cls, unit):
cls.header['right'] = '%s/s' % unit
COLUMNS['rate-down'] = RateDown
class LimitRateUp(ColumnBase):
class RateDown(ColumnBase):
header = {'left': 'Dn', 'right': '?/s'}
width = 6
min_width = 6
def get_value(self):
return self._from_cache(_ensure_hide_unit, self.data['rate-down'])
@classmethod
def set_unit(cls, unit):
cls.header['right'] = '%s/s' % unit
COLUMNS['rate-down'] = RateDown
class ETA(ColumnBase):
header = {'left': 'ETA'}
width = 5
min_width = 3
def get_value(self):
return self.data['eta']
COLUMNS['eta'] = ETA
class RateEst(ColumnBase):
header = {'left': 'Est', 'right': '?/s'}
width = 7
min_width = 7
def get_value(self):
header = {'left': 'LmtDn', 'right': '?/s'}
width = 9
min_width = 9
needed_keys = ('limit-rate-down',)
def get_value(self):
return self._from_cache(_ensure_hide_unit, self.data['limit-rate-down'])
@classmethod
def set_unit(cls, unit):
cls.header['right'] = '%s/s' % unit
COLUMNS['limit-rate-down'] = LimitRateDown
class Eta(ColumnBase):
header = {'left': 'ETA'}
width = 5
min_width = 9
needed_keys = ('timespan-eta',)
def get_value(self):
return self.data['timespan-eta']
COLUMNS['eta'] = Eta
class Tracker(ColumnBase):
header = {'left': 'Tracker'}
width = 10
min_width = 5
needed_keys = ('trackers',)