Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def build_airflow_operator(self, task_cls, call_args, call_kwargs):
if try_get_databand_context() is self.dbnd_context:
# we are already in the context of build
return self._build_airflow_operator(
task_cls=task_cls, call_args=call_args, call_kwargs=call_kwargs
)
# we are coming from external world
with dbnd_config.config_layer_context(
self.dbnd_config_layer
) as c, DatabandContext.context(_context=self.dbnd_context) as dc:
return self._build_airflow_operator(
task_cls=task_cls, call_args=call_args, call_kwargs=call_kwargs
)