Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
attach = AttachOperator(task_id="attach")
analyze = AnalyzeOperator(
task_id="analyze", target="{{ extract_table }}", pool="postgres_etl",
)
latest_only = LatestOnlyOperator(task_id="analyze_parent_only_for_new")
analyze_parent = AnalyzeOperator(
task_id="analyze_parent", target="{{ parent_table }}", pool="postgres_etl",
)
update_records = UpdateETLTableOperator(task_id="update_records")
create_staging_view >> check_not_empty >> check_not_in_flux >> extract
from_stage = extract
if cluster_field is not None:
cluster = ClusterOperator(
task_id="cluster", cluster_field=cluster_field, pool="postgres_etl"
)
extract >> cluster
from_stage = cluster
from_stage >> [
add_constraints,
add_indexes,
] >> analyze >> attach >> latest_only >> analyze_parent
attach >> [update_records, *get_qa_checks()]
globals()[dag_id] = dag
return dag