Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
max_delay = datetime.timedelta(minutes=60)
dag = models.DAG(dag_id='fail_dag')
task = BashOperator(
task_id='task_with_exp_backoff_and_short_time_interval',
bash_command='exit 1',
retries=3,
retry_delay=delay,
retry_exponential_backoff=True,
max_retry_delay=max_delay,
dag=dag,
owner='airflow',
start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
ti = TI(
task=task, execution_date=DEFAULT_DATE)
ti.end_date = pendulum.instance(timezone.utcnow())
date = ti.next_retry_datetime()
# between 1 * 2^0.5 and 1 * 2^1 (15 and 30)
period = ti.end_date.add(seconds=1) - ti.end_date.add(seconds=15)
self.assertTrue(date in period)
def serialize_fmt(dt):
p_dt = pendulum.instance(dt)
return dict(dt=p_dt.naive().to_iso8601_string(), tz=p_dt.tzinfo.name)
def _format_export_datetime(dt: datetime) -> str:
"""Format date and time as required by the export format specification."""
tz_str = current_app.config['SHOP_ORDER_EXPORT_TIMEZONE']
localized_dt = pendulum.instance(dt).in_tz(tz_str)
date_time, utc_offset = localized_dt.strftime('%Y-%m-%dT%H:%M:%S|%z') \
.split('|', 1)
if len(utc_offset) == 5:
# Insert colon between hours and minutes.
utc_offset = utc_offset[:3] + ':' + utc_offset[3:]
return date_time + utc_offset
def add(self, **kwargs):
"""Returns a new MayaDT object with the given offsets."""
return self.from_datetime(pendulum.instance(self.datetime()).add(**kwargs))
def _validate_dates(self, start_date, end_date):
assert not (bool(start_date) ^ bool(end_date)), 'Must specify both a start and end date or neither'
assert isinstance(start_date, (datetime.timedelta, datetime.datetime, pendulum.Pendulum)) and isinstance(end_date, (datetime.timedelta, datetime.datetime, pendulum.Pendulum)), 'start_date and end_date must be either datetimes or timedeltas'
assert not (isinstance(start_date, datetime.timedelta) and isinstance(end_date, datetime.timedelta)), 'Only one of start_date and end_date may be a timedelta'
if isinstance(start_date, datetime.datetime):
start_date = pendulum.instance(start_date)
if isinstance(end_date, datetime.datetime):
end_date = pendulum.instance(end_date)
if isinstance(start_date, datetime.timedelta):
start_date = pendulum.instance(end_date + start_date)
if isinstance(end_date, datetime.timedelta):
end_date = pendulum.instance(start_date + end_date)
og_start, og_end = start_date, end_date
start_date, end_date = self.shift_range(start_date, end_date)
assert isinstance(start_date, pendulum.Pendulum) and isinstance(end_date, pendulum.Pendulum), 'transpose_time_window must return a tuple of 2 datetimes'
if (og_start, og_end) != (start_date, end_date):
logger.warning('Date shifted from {} - {} to {} - {}. Disable shifting by passing shift_range=False'.format(og_start, og_end, start_date, end_date))
assert start_date < end_date, 'start_date must be before end_date {} < {}'.format(start_date, end_date)
return start_date, end_date
def slang_date(self, locale="en"):
""""Returns human slang representation of date.
Keyword Arguments:
locale -- locale to translate to, e.g. 'fr' for french.
(default: 'en' - English)
"""
dt = pendulum.instance(self.datetime())
try:
return _translate(dt, locale)
except KeyError:
pass
delta = humanize.time.abs_timedelta(
timedelta(seconds=(self.epoch - now().epoch))
)
format_string = "DD MMM"
if delta.days >= 365:
format_string += " YYYY"
return dt.format(format_string, locale=locale).title()
def __set__(self, instance, value): # type: (typing.Optional['base.Entity'], typing.Union[datetime.datetime, pendulum.DateTime]) -> None
if value is None:
return super().__set__(instance, value)
config = instance._config or utils.Config.factory()
if isinstance(value, datetime.datetime):
if self._is_naive(value):
value = pendulum.instance(value, config.timezone)
else:
value = pendulum.instance(value)
elif isinstance(value, pendulum.DateTime):
pass
else:
raise TypeError('Value which is being set to DateTimeField have to be either '
'datetime.datetime or pendulum.DateTime object!')
super().__set__(instance, value)
def _serialize(self, value, attr, obj, **kwargs): # type: ignore
if value is not None:
dt = pendulum.instance(value)
return dict(dt=dt.naive().to_iso8601_string(), tz=dt.tzinfo.name)
def serialize_timestamp(timestamp):
"""Converts a datetime into seconds since the epoch."""
return int(pendulum.instance(timestamp).timestamp())
def __init__(self, start_date: datetime = None, end_date: datetime = None):
if start_date is not None:
start_date = pendulum.instance(start_date)
if end_date is not None:
end_date = pendulum.instance(end_date)
self.start_date = start_date
self.end_date = end_date