Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_following_previous_schedule_daily_dag_CEST_to_CET(self):
"""
Make sure DST transitions are properly observed
"""
local_tz = pendulum.timezone('Europe/Zurich')
start = local_tz.convert(datetime.datetime(2018, 10, 27, 3),
dst_rule=pendulum.PRE_TRANSITION)
utc = timezone.convert_to_utc(start)
dag = DAG('tz_dag', start_date=start, schedule_interval='0 3 * * *')
prev = dag.previous_schedule(utc)
prev_local = local_tz.convert(prev)
self.assertEqual(prev_local.isoformat(), "2018-10-26T03:00:00+02:00")
self.assertEqual(prev.isoformat(), "2018-10-26T01:00:00+00:00")
_next = dag.following_schedule(utc)
next_local = local_tz.convert(_next)
def test_pickle_timezone():
dt1 = pendulum.timezone("Europe/Amsterdam")
s = pickle.dumps(dt1)
dt2 = pickle.loads(s)
assert isinstance(dt2, Timezone)
dt1 = pendulum.timezone("UTC")
s = pickle.dumps(dt1)
dt2 = pickle.loads(s)
assert isinstance(dt2, Timezone)
def test_following_previous_schedule(self):
"""
Make sure DST transitions are properly observed
"""
local_tz = pendulum.timezone('Europe/Zurich')
start = local_tz.convert(datetime.datetime(2018, 10, 28, 2, 55),
dst_rule=pendulum.PRE_TRANSITION)
self.assertEqual(start.isoformat(), "2018-10-28T02:55:00+02:00",
"Pre-condition: start date is in DST")
utc = timezone.convert_to_utc(start)
dag = DAG('tz_dag', start_date=start, schedule_interval='*/5 * * * *')
_next = dag.following_schedule(utc)
next_local = local_tz.convert(_next)
self.assertEqual(_next.isoformat(), "2018-10-28T01:00:00+00:00")
self.assertEqual(next_local.isoformat(), "2018-10-28T02:00:00+01:00")
prev = dag.previous_schedule(utc)
prev_local = local_tz.convert(prev)
def test_tzname_fold_attribute_is_honored():
tz = pendulum.timezone("US/Eastern")
dt = datetime(2014, 11, 2, 1, 30)
name = tz.tzname(dt)
assert name == "EDT"
name = tz.tzname(dt.replace(fold=1))
assert name == "EST"
def test_utcoffset():
tz = pendulum.timezone("America/Guayaquil")
utcoffset = tz.utcoffset(pendulum.now("UTC"))
assert utcoffset == timedelta(0, -18000)
def test_dag_naive_default_args_start_date_with_timezone(self):
local_tz = pendulum.timezone('Europe/Zurich')
default_args = {'start_date': datetime.datetime(2018, 1, 1, tzinfo=local_tz)}
dag = DAG('DAG', default_args=default_args)
self.assertEqual(dag.timezone.name, local_tz.name)
dag = DAG('DAG', default_args=default_args)
self.assertEqual(dag.timezone.name, local_tz.name)
import datetime
import os
import pendulum
import pytest
from dagfactory import utils
NOW = datetime.datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
CET = pendulum.timezone("Europe/Amsterdam")
UTC = pendulum.timezone("UTC")
def test_get_start_date_date_no_timezone():
expected = datetime.datetime(2018, 2, 1, 0, 0, tzinfo=UTC)
actual = utils.get_datetime(datetime.date(2018, 2, 1))
assert actual == expected
def test_get_start_date_datetime_no_timezone():
expected = datetime.datetime(2018, 2, 1, 0, 0, tzinfo=UTC)
actual = utils.get_datetime(datetime.datetime(2018, 2, 1))
assert actual == expected
def test_get_start_date_relative_time_no_timezone():
# check ti without dag (just for bw compat)
op_no_dag = DummyOperator(task_id='op_no_dag')
ti = TI(task=op_no_dag, execution_date=NAIVE_DATETIME)
self.assertEqual(ti.execution_date, DEFAULT_DATE)
# check with dag without localized execution_date
dag = DAG('dag', start_date=DEFAULT_DATE)
op1 = DummyOperator(task_id='op_1')
dag.add_task(op1)
ti = TI(task=op1, execution_date=NAIVE_DATETIME)
self.assertEqual(ti.execution_date, DEFAULT_DATE)
# with dag and localized execution_date
tz = pendulum.timezone("Europe/Amsterdam")
execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tz)
utc_date = timezone.convert_to_utc(execution_date)
ti = TI(task=op1, execution_date=execution_date)
self.assertEqual(ti.execution_date, utc_date)
"""
dbt Cloud Hourly DAG
"""
from airflow import DAG, utils
from airflow.operators import DbtCloudRunJobOperator
from airflow.sensors import DbtCloudRunSensor
from datetime import datetime,timedelta
import pendulum
local_tz = pendulum.timezone("America/Los_Angeles")
default_args = {
'owner': 'dwall',
'depends_on_past': False,
'start_date': datetime(2019, 1, 8, tzinfo=local_tz),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'provide_context': True
}
dag = DAG('dbt_cloud_hourly_dag', concurrency=1, max_active_runs=1, catchup=False, schedule_interval='0 * * * *', default_args=default_args)
dag.doc_md = __doc__
# Run hourly DAG through dbt cloud.
run_dbt_cloud_job = DbtCloudRunJobOperator(
task_id='run_dbt_cloud_job',
def ensure_tz_aware(dt: datetime.datetime) -> datetime.datetime:
"""
Ensures that a datetime has a timezone. If it does not (it is timezone-naive), converts
it to UTC.
Args
- dt (datetime): the datetime which should be timezone-aware
Returns
- datetime: a timezone-aware datetime
"""
if dt.tzinfo is None:
return pendulum.timezone("utc").convert(dt)
return dt