Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
month = int(dt.month - 1 + real_months)
year = int(dt.year + month // 12) # Integer division
month = int(month % 12 + 1)
day = min(dt.day, calendar.monthrange(year, month)[1]) # Avoid day 30 in February
dt = dt.replace(year=year, month=month, day=day)
# Subtract the approximate number seconds in the number of months
secs = secs % (real_months * SECONDS[1][1])
except (OverflowError, ValueError):
pass
else:
dt = dt + datetime.timedelta(seconds=secs)
ts = Timestamp(dt.timestamp())
if ts is None:
# Without years or months given, we can simply add our delta to time.time()
ts = Timestamp(time.time() + self)
return ts
`which` must be 'Scrape' or 'Announce'.
transmission.h says:
/* when the next periodic (announce|scrape) message will be sent out.
if (announce|scrape)State isn't TR_TRACKER_WAITING, this field is undefined */
"""
state = tracker['%sState' % which.lower()]
if state == 1: # TR_TRACKER_WAITING = 1
return tracker['next%sTime' % which]
elif state == 0: # Torrent is paused
return ttypes.Timestamp.NOT_APPLICABLE
elif state == 2: # Announce/scrape is queued
return ttypes.Timestamp.SOON
else:
return ttypes.Timestamp.NOW
# When adding/subtracting months, we may have to keep the
# month between 1-12. We also may have to adjust the year
# and/or day.
# https://stackoverflow.com/a/4131114
month = int(dt.month - 1 + real_months)
year = int(dt.year + month // 12) # Integer division
month = int(month % 12 + 1)
day = min(dt.day, calendar.monthrange(year, month)[1]) # Avoid day 30 in February
dt = dt.replace(year=year, month=month, day=day)
# Subtract the approximate number seconds in the number of months
secs = secs % (real_months * SECONDS[1][1])
except (OverflowError, ValueError):
pass
else:
dt = dt + datetime.timedelta(seconds=secs)
ts = Timestamp(dt.timestamp())
if ts is None:
# Without years or months given, we can simply add our delta to time.time()
ts = Timestamp(time.time() + self)
return ts
"""
Handle next(Announce|Scrape)Time RPC key
`which` must be 'Scrape' or 'Announce'.
transmission.h says:
/* when the next periodic (announce|scrape) message will be sent out.
if (announce|scrape)State isn't TR_TRACKER_WAITING, this field is undefined */
"""
state = tracker['%sState' % which.lower()]
if state == 1: # TR_TRACKER_WAITING = 1
return tracker['next%sTime' % which]
elif state == 0: # Torrent is paused
return ttypes.Timestamp.NOT_APPLICABLE
elif state == 2: # Announce/scrape is queued
return ttypes.Timestamp.SOON
else:
return ttypes.Timestamp.NOW
def _next_time(tracker, which):
"""
Handle next(Announce|Scrape)Time RPC key
`which` must be 'Scrape' or 'Announce'.
transmission.h says:
/* when the next periodic (announce|scrape) message will be sent out.
if (announce|scrape)State isn't TR_TRACKER_WAITING, this field is undefined */
"""
state = tracker['%sState' % which.lower()]
if state == 1: # TR_TRACKER_WAITING = 1
return tracker['next%sTime' % which]
elif state == 0: # Torrent is paused
return ttypes.Timestamp.NOT_APPLICABLE
elif state == 2: # Announce/scrape is queued
return ttypes.Timestamp.SOON
else:
return ttypes.Timestamp.NOW
def check_timestamp_filter(self, filter_cls, filter_names, key, default_sign):
from stig.client.ttypes import Timestamp
items = ({'id': 1, key: Timestamp.from_string('2001-01-01')},
{'id': 2, key: Timestamp.from_string('2002-01-01')},
{'id': 3, key: Timestamp.from_string('2003-01-01')},
{'id': 4, key: Timestamp(Timestamp.NOW)},
{'id': 5, key: Timestamp(Timestamp.UNKNOWN)},
{'id': 6, key: Timestamp(Timestamp.NOT_APPLICABLE)},
{'id': 7, key: Timestamp(Timestamp.NEVER)},
{'id': 8, key: Timestamp(Timestamp.SOON)})
for fn in filter_names:
self._check_timestamp_as_bool(filter_cls, fn, key, items)
self._check_timestamp_with_absolute_times(filter_cls, fn, key, items)
self._check_timestamp_with_relative_times(filter_cls, fn, key, items)
if default_sign == 1:
self._check_timestamp_with_positive_default_sign(filter_cls, fn, key, items)
elif default_sign == -1:
def check_timestamp_filter(self, filter_cls, filter_names, key, default_sign):
from stig.client.ttypes import Timestamp
items = ({'id': 1, key: Timestamp.from_string('2001-01-01')},
{'id': 2, key: Timestamp.from_string('2002-01-01')},
{'id': 3, key: Timestamp.from_string('2003-01-01')},
{'id': 4, key: Timestamp(Timestamp.NOW)},
{'id': 5, key: Timestamp(Timestamp.UNKNOWN)},
{'id': 6, key: Timestamp(Timestamp.NOT_APPLICABLE)},
{'id': 7, key: Timestamp(Timestamp.NEVER)},
{'id': 8, key: Timestamp(Timestamp.SOON)})
for fn in filter_names:
self._check_timestamp_as_bool(filter_cls, fn, key, items)
self._check_timestamp_with_absolute_times(filter_cls, fn, key, items)
self._check_timestamp_with_relative_times(filter_cls, fn, key, items)
if default_sign == 1:
self._check_timestamp_with_positive_default_sign(filter_cls, fn, key, items)
elif default_sign == -1:
self._check_timestamp_with_negative_default_sign(filter_cls, fn, key, items)
else:
raise RuntimeError('Invalid default_sign: %r' % (default_sign,))
def timestamp_or_timedelta(string, default_sign=1):
"""
Try to parse `string` as Timestamp or Timedelta
If `string` is parsed as Timedelta and the string does not indicate whether
the Timedelta should be positive (in the future) or negative (in the past),
multiply it by `default_sign`, which should be either 1 (default) or -1.
"""
try:
return Timestamp.from_string(string)
except ValueError:
delta = Timedelta.from_string(string)
# Apply default_sign if no sign was given explicitly
string = string.strip()
if (not string.startswith('in') and
not string.endswith('ago') and
string[0] not in ('+', '-')):
delta = delta.inverse if default_sign < 0 else delta
return delta
def timestamp(self):
if self == self.UNKNOWN:
return Timestamp(Timestamp.UNKNOWN)
elif self == self.NOT_APPLICABLE:
return Timestamp(Timestamp.NOT_APPLICABLE)
else:
ts = None
real_years, real_months = self._real_years, self._real_months
if real_years or real_months:
# Because we can't know the exact number of seconds in a
# year/month, we take the number of years/months provided by the
# parser and use datetime and timedelta to calculate the
# timestamp.
secs = int(self)
dt = datetime.datetime.now()
# datetime will throw OverflowError if numbers are too large
try:
if real_years is not None:
dt = dt.replace(year=int(dt.year + real_years))
# Subtract the approximate number seconds in the number of years
secs = secs % (real_years * SECONDS[0][1])
def cmp_timestamp_or_timdelta(item_value, op, user_value):
"""Compare any combination of Timestamp and Timedelta objects"""
if not item_value:
return False
type_item_value = type(item_value)
type_user_value = type(user_value)
if type_user_value is Timestamp:
if type_item_value is Timestamp:
# result = op(item_value, user_value)
# log.debug('1: %r %s %r = %r', item_value, op.__name__, user_value, result)
# return result
return op(item_value, user_value)
elif type_item_value is Timedelta:
# result = op(item_value, user_value.timedelta)
# log.debug('2: %r %s %r (%r) = %r',
# item_value, op.__name__, user_value, user_value.timedelta, result)
# return result
return op(item_value, user_value.timedelta)
elif type_user_value is Timedelta:
# If the filter is, e.g., 'less than 1y ago', future dates would match,
# but we interpret the filter as 'less than 1y ago up till now'. Same
# thing for 'in less than 1y' - past dates technically match, but they
# shouldn't.