Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run_and_get(dag, task_id, execution_date=None):
execution_date = execution_date or utcnow()
try:
_run_dag(dag, execution_date=execution_date)
except Exception:
logger.exception("Failed to run %s %s %s", dag.dag_id, task_id, execution_date)
from airflow.configuration import conf as airflow_conf
iso = execution_date.isoformat()
log = os.path.expanduser(airflow_conf.get("core", "BASE_LOG_FOLDER"))
log_path = os.path.join(log, dag.dag_id, task_id, iso)
logger.info("Check logs at %s", log_path)
raise
return _get_result(dag, task_id, execution_date=execution_date)
def wait(self):
"""
Waits for pod completion
:return:
"""
self._wait_for_pod_started()
logger.info("Pod '%s' is running, reading logs..", self.name)
self.stream_pod_logs(follow=True)
logger.info("Successfully read %s pod logs", self.name)
from airflow.utils.state import State
final_state = self.get_airflow_state()
wait_start = utcnow()
while final_state not in {State.SUCCESS, State.FAILED}:
logger.debug(
"Pod '%s' is not completed with state %s, waiting..",
self.name,
final_state,
)
if (
utcnow() - wait_start
) > self.kube_config.submit_termination_grace_period:
raise DatabandRuntimeError(
"Pod is not in a final state after {grace_period}: {state}".format(
grace_period=self.kube_config.submit_termination_grace_period,
state=final_state,
)
)
time.sleep(5)
def _log_param(self, run_id, param):
# type: (str, mlflow.entities.Param) -> None
# Temporarly log params as metrics
dbnd_metric = Metric(key=param.key, value=param.value, timestamp=utcnow())
self.dbnd_store.log_metric(self._get_current_task_run(), dbnd_metric)
logger.info("Param {}".format(param))
dbnd_target = targets.get(target_path)
if not dbnd_target:
# we see this target for the first time
target_task_run_uid = (
None
) # let assume that Target is now owned by any task
# let try to find it's owner, so we create target that relates to some Task
# if `task` is pipeline, the target owner is going to be different task
if target.task:
target_task_run = run.get_task_run(target.task.task_id)
if target_task_run:
target_task_run_uid = target_task_run.task_run_uid
dbnd_target = targets[target_path] = TargetInfo(
path=target_path,
created_date=utcnow(),
task_run_uid=target_task_run_uid,
parameter_name=name,
)
logger.debug(
"New Target: %s -> %s -> %s",
target.task,
target_task_run_uid,
target_path,
)
task_targets.append(dbnd_target)
def stop(self, at_exit=True, update_run_state=True):
if update_run_state:
databand_run = try_get_databand_run()
if databand_run:
root_tr = databand_run.task.current_task_run
root_tr.finished_time = utcnow()
for tr in databand_run.task_runs:
if tr.task_run_state == TaskRunState.FAILED:
root_tr.set_task_run_state(TaskRunState.UPSTREAM_FAILED)
databand_run.set_run_state(RunState.FAILED)
break
else:
root_tr.set_task_run_state(TaskRunState.SUCCESS)
databand_run.set_run_state(RunState.SUCCESS)
logger.info(databand_run.describe.run_banner_for_finished())
self._close_all_context_managers()
if at_exit and is_airflow_enabled():
from airflow.settings import dispose_orm
dispose_orm()
def set_task_run_state(
self,
task_run,
state,
error=None,
timestamp=None,
do_not_update_start_date=False,
):
super(ConsoleStore, self).set_task_run_state(task_run=task_run, state=state)
task = task_run.task
# optimize, don't print success banner for fast running tasks
quick_task = task_run.finished_time and (
task_run.finished_time
- (task_run.start_time if task_run.start_time else utcnow())
) < timedelta(seconds=5)
show_simple_log = not self.verbose and (
task_run.task.task_is_system or quick_task
)
level = logging.INFO
color = "cyan"
task_friendly_id = task_run.task_af_id
if state in [TaskRunState.RUNNING, TaskRunState.QUEUED]:
task_msg = "Running task %s" % task_friendly_id
elif state == TaskRunState.SUCCESS:
task_msg = "Task %s has been completed!" % (task_friendly_id)
color = "green"
elif state == TaskRunState.FAILED:
task_msg = "Task %s has failed!" % (task_friendly_id)
color = "red"
level = logging.ERROR
dag_id = dag.dag_id
result.scheduled_job_infos.append(
ScheduledJobInfo(
uid=job_uid(dag_id),
name=dag_id,
cmd="",
start_date=dag.start_date,
end_date=dag.end_date,
schedule_interval=dag.schedule_interval,
catchup=dag.catchup,
depends_on_past=None,
retries=None,
active=None,
create_user=dag.owner,
create_time=utcnow(),
update_user=None,
update_time=None,
from_file=False,
deleted_from_file=False,
list_order=None,
job_name=dag_id,
)
)
task_defs = {
task.task_id: _to_task_def(task) for task in dag.tasks
} # type: Dict[str, TaskDefinitionInfo]
upstream_map = list(_get_upstream_map(dag))
for dagrun in dagruns[dag_id]:
run_info = _to_dbnd_run(dag, dagrun)
logger.info("Pod '%s' is running, reading logs..", self.name)
self.stream_pod_logs(follow=True)
logger.info("Successfully read %s pod logs", self.name)
from airflow.utils.state import State
final_state = self.get_airflow_state()
wait_start = utcnow()
while final_state not in {State.SUCCESS, State.FAILED}:
logger.debug(
"Pod '%s' is not completed with state %s, waiting..",
self.name,
final_state,
)
if (
utcnow() - wait_start
) > self.kube_config.submit_termination_grace_period:
raise DatabandRuntimeError(
"Pod is not in a final state after {grace_period}: {state}".format(
grace_period=self.kube_config.submit_termination_grace_period,
state=final_state,
)
)
time.sleep(5)
final_state = self.get_airflow_state()
if final_state != State.SUCCESS:
raise DatabandRuntimeError(
"Pod returned a failure: {state}".format(state=final_state)
)
return self
for thread_id, stack in sys._current_frames().items():
code.append(
"\n# Thread: {}({})".format(id_to_name.get(thread_id, ""), thread_id)
)
for filename, line_number, name, line in traceback.extract_stack(stack):
code.append(
'File: "{}", line {}, in {}'.format(filename, line_number, name)
)
if line:
code.append(" {}".format(line.strip()))
traceback_data = "\n".join(code)
_p("%s\n" % traceback_data)
if dump_file is True:
dump_file = os.path.join(
SIGQUIT_DUMP_DIR,
"dbnd.dump.%s.txt" % (utcnow().strftime("%Y%m%d-%H%M%S")),
)
if dump_file:
with open(dump_file, "wt") as df_fp:
df_fp.write(traceback_data)
_p("Stack has been dumped into {}".format(dump_file))
return traceback_data
except Exception as e:
print(
"Couldn't report stack traces after reciving SIGQUIT! Exception: %s", str(e)
)
def _run_to_run_info(self):
# type: () -> RunInfo
run = self.run
task = run.driver_task_run.task
context = run.context
env = run.env
return RunInfo(
run_uid=run.run_uid,
job_name=run.job_name,
user=context.task_run_env.user,
name=run.name,
state=RunState.RUNNING,
start_time=utcnow(),
end_time=None,
description=run.description,
is_archived=run.is_archived,
env_name=env.name,
cloud_type=env.cloud_type,
# deprecate and airflow
dag_id=run.dag_id,
execution_date=run.execution_date,
cmd_name=context.name,
driver_name=env.remote_engine or env.local_engine,
# move to task
target_date=task.task_target_date,
version=task.task_version,
# root and submitted by
root_run=run.root_run_info,
scheduled_run=run.scheduled_run_info,