Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
macros = dict(**additional_macros)
if source_table is not None:
macros["source_table"] = source_table
with DAG(
dag_id=dag_id,
schedule_interval=schedule_interval,
default_args=args,
user_defined_macros=macros,
params=dict(cdr_type=cdr_type),
) as dag:
if staging_view_sql is not None and source_table is not None:
create_staging_view = CreateStagingViewOperator(
task_id="create_staging_view", sql=staging_view_sql,
)
extract = ExtractFromViewOperator(
task_id="extract", sql=extract_sql, pool="postgres_etl"
)
elif filename is not None and len(fields) > 0:
create_staging_view = CreateForeignStagingTableOperator(
task_id="create_staging_view",
program=program,
filename=filename,
fields=fields,
null=null,
header=header,
delimiter=delimiter,
quote=quote,
escape=escape,
encoding=encoding,
)
extract = ExtractFromForeignTableOperator(