Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_text_timezone(self):
returned = convert_to_datetime('2009-8-1', 'UTC', None)
assert returned == datetime(2009, 8, 1, tzinfo=pytz.utc)
def test_date(self, timezone, input, expected):
returned = convert_to_datetime(input, timezone, None)
if expected is not None:
assert isinstance(returned, datetime)
expected = timezone.localize(expected) if not expected.tzinfo else expected
assert returned == expected
if self.interval_length == 0:
self.interval = timedelta(seconds=1)
self.interval_length = 1
if timezone:
self.timezone = astimezone(timezone)
elif start_date and start_date.tzinfo:
self.timezone = start_date.tzinfo
elif end_date and end_date.tzinfo:
self.timezone = end_date.tzinfo
else:
self.timezone = get_localzone()
start_date = start_date or (datetime.now(self.timezone) + self.interval)
self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date')
self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date')
def __init__(self, year=None, month=None, day=None, week=None, day_of_week=None, hour=None,
minute=None, second=None, start_date=None, end_date=None, timezone=None,
jitter=None):
if timezone:
self.timezone = astimezone(timezone)
elif isinstance(start_date, datetime) and start_date.tzinfo:
self.timezone = start_date.tzinfo
elif isinstance(end_date, datetime) and end_date.tzinfo:
self.timezone = end_date.tzinfo
else:
self.timezone = get_localzone()
self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date')
self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date')
self.jitter = jitter
values = dict((key, value) for (key, value) in locals().items()
if key in self.FIELD_NAMES and value is not None)
self.fields = []
assign_defaults = False
for field_name in self.FIELD_NAMES:
if field_name in values:
exprs = values.pop(field_name)
is_default = False
assign_defaults = not values
elif assign_defaults:
exprs = DEFAULT_VALUES[field_name]
is_default = True
else:
trigger = changes.pop('trigger')
if not isinstance(trigger, BaseTrigger):
raise TypeError('Expected a trigger instance, got %s instead' %
trigger.__class__.__name__)
approved['trigger'] = trigger
if 'executor' in changes:
value = changes.pop('executor')
if not isinstance(value, six.string_types):
raise TypeError('executor must be a string')
approved['executor'] = value
if 'next_run_time' in changes:
value = changes.pop('next_run_time')
approved['next_run_time'] = convert_to_datetime(value, self._scheduler.timezone,
'next_run_time')
if changes:
raise AttributeError('The following are not modifiable attributes of Job: %s' %
', '.join(changes))
for key, value in six.iteritems(approved):
setattr(self, key, value)
def __init__(self, min_interval, max_interval=None, start_date=None):
if not isinstance(min_interval, timedelta):
raise TypeError('min_interval must be a timedelta')
if max_interval and not isinstance(max_interval, timedelta):
raise TypeError('max_interval must be a timedelta')
if start_date:
start_date = convert_to_datetime(start_date)
self.min_interval = min_interval
self.min_interval_length = timedelta_seconds(self.min_interval)
if self.min_interval_length == 0:
self.min_interval = timedelta(seconds=1)
self.min_interval_length = 1
self.max_interval = max_interval
self.max_interval_length = None
if max_interval:
self.max_interval_length = timedelta_seconds(self.max_interval)
if self.max_interval_length == 0:
self.max_interval = timedelta(seconds=1)
self.max_interval_length = 1
if(self.max_interval and
def __init__(self, **values):
self.start_date = values.pop('start_date', None)
if self.start_date:
self.start_date = convert_to_datetime(self.start_date)
# Yank out all None valued fields
for key, value in list(iteritems(values)):
if value is None:
del values[key]
self.fields = []
assign_defaults = False
for field_name in self.FIELD_NAMES:
if field_name in values:
exprs = values.pop(field_name)
is_default = False
assign_defaults = not values
elif assign_defaults:
exprs = DEFAULT_VALUES[field_name]
is_default = True
def __init__(self, run_date=None, timezone=None):
timezone = astimezone(timezone) or get_localzone()
if run_date is not None:
self.run_date = convert_to_datetime(run_date, timezone, 'run_date')
else:
self.run_date = datetime.now(timezone)
def __init__(self, year=None, month=None, day=None, week=None, day_of_week=None, hour=None,
minute=None, second=None, start_date=None, end_date=None, timezone=None,
jitter=None):
if timezone:
self.timezone = astimezone(timezone)
elif isinstance(start_date, datetime) and start_date.tzinfo:
self.timezone = start_date.tzinfo
elif isinstance(end_date, datetime) and end_date.tzinfo:
self.timezone = end_date.tzinfo
else:
self.timezone = get_localzone()
self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date')
self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date')
self.jitter = jitter
values = dict((key, value) for (key, value) in locals().items()
if key in self.FIELD_NAMES and value is not None)
self.fields = []
assign_defaults = False
for field_name in self.FIELD_NAMES:
if field_name in values:
exprs = values.pop(field_name)
is_default = False
assign_defaults = not values
elif assign_defaults:
exprs = DEFAULT_VALUES[field_name]
is_default = True