Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def validate_task_inputs(self):
if not self.task.ctrl.should_run():
missing = find_non_completed(self.relations.task_outputs_user)
missing_str = non_completed_outputs_to_str(missing)
raise DatabandConfigError(
"You are missing some input tasks in your pipeline! \n\t%s\n"
"The task execution was disabled for '%s'."
% (missing_str, self.task.task_id)
)
missing = []
for partial_output in flatten(self.relations.task_inputs_user):
if not partial_output.exists():
missing.append(partial_output)
if missing:
raise friendly_error.task_data_source_not_exists(
self, missing, downstream=[self.task]
)
use_cache &= target._cache.get(PANDAS_CACHE_KEY) is not None
use_cache &= (
"key" not in read_kwargs
) # support for hdf5, we don't support non default key
if not use_cache:
try:
logger.info("Loading data frame from target='%s'", target)
if self.support_direct_read(target):
df = self._pd_read(target.path, **read_kwargs)
else:
with target.open("r") as fp:
df = self._pd_read(fp, **read_kwargs)
except Exception as ex:
raise friendly_error.failed_to_read_pandas(ex, target)
else:
logger.info("Loading data frame from cache: %s", target)
df = target._cache.get(PANDAS_CACHE_KEY)
if no_copy_on_read:
if set_index:
logger.warning(
"You are using no_copy_on_read, with set_index, "
"that will change df in cache for all usages: %s" % target
)
else:
df = df.copy()
if set_index:
try:
df = _data_frame_set_index(df, set_index)
except Exception as ex:
def build_kube_dbnd(self, in_cluster=None):
from dbnd_docker.kubernetes.kube_dbnd_client import DbndKubernetesClient
from kubernetes.config import ConfigException
try:
kube_client = self.get_kube_client(in_cluster=in_cluster)
except ConfigException as e:
raise friendly_error.executor_k8s.failed_to_connect_to_cluster(
self.in_cluster, e
)
kube_dbnd = DbndKubernetesClient(kube_client=kube_client, engine_config=self)
return kube_dbnd
target = self.target
if isinstance(target, MultiTarget):
if m.support_multi_target_direct_read:
return m.target_to_value(target, **kwargs)
elif isinstance(target, DirTarget):
if m.support_directory_direct_read and m.support_direct_access(target):
return m.target_to_value(target, **kwargs)
elif isinstance(target, FileTarget):
return m.target_to_value(target, **kwargs)
partitions = target.list_partitions()
if len(partitions) == 1:
return m.target_to_value(partitions[0], **kwargs)
if not self.value_type.support_merge:
raise friendly_error.marshaller_no_merge(self, target, partitions)
# Concatenate all data into one DataFrame
# We don't want list to be stored in memory
# however, concat does list() on the iterator as one of the first things
partitions_values = [m.target_to_value(t, **kwargs) for t in partitions]
return self.value_type.merge_values(*partitions_values)
if (
"Insufficient cpu" in condition.message
or "Insufficient memory" in condition.message
):
if self.kube_config.check_cluster_resource_capacity:
kube_resources_checker = DbndKubeResourcesChecker(
kube_client=self.kube_client,
kube_config=self.kube_config,
)
kube_resources_checker.check_if_resource_request_above_max_capacity(
condition.message
)
logger.warning("pod is pending because %s" % condition.message)
else:
raise friendly_error.executor_k8s.kubernetes_pod_unschedulable(
condition.message
)
if pod_status.container_statuses:
container_waiting_state = pod_status.container_statuses[0].state.waiting
if pod_status.phase == "Pending" and container_waiting_state:
if container_waiting_state.reason == "ErrImagePull":
logger.info(
"Found problematic condition at %s :%s %s",
self.name,
container_waiting_state.reason,
container_waiting_state.message,
)
raise friendly_error.executor_k8s.kubernetes_image_not_found(
pod_status.container_statuses[0].image,
container_waiting_state.message,
def assert_plugin_enabled(module, reason=None, module_import=None):
if not is_plugin_enabled(module, module_import=module_import):
raise friendly_error.config.missing_module(module, reason)
return True
return traverse(value, self.value_type.target_to_value)
# usually we should not load "outputs" on read
if self.is_output():
# actually we should not load it, so just return
return value
if isinstance(value, Target):
try:
runtime_value = self.load_from_target(value)
if self.is_input():
self._log_target_on_read(runtime_value, value, task)
return runtime_value
except Exception as ex:
raise friendly_error.failed_to_read_target_as_task_input(
ex=ex, task=task, parameter=self, target=value
)
if (
isinstance(self.value_type, _StructureValueType)
and self.value_type.sub_value_type
):
try:
def load_with_preview(val):
runtime_val = self.value_type.sub_value_type.load_runtime(val)
if self.is_input() and isinstance(val, Target):
# Optimisation opportunity: log all targets in a single call
self._log_target_on_read(runtime_val, val, task)
return runtime_val
def _initialize(self):
super(EnvConfig, self)._initialize()
try:
self.dbnd_root = self.dbnd_root or self.root.folder("dbnd")
if not self.dbnd_local_root:
if not self.dbnd_root.is_local():
raise friendly_error.config.dbnd_root_local_not_defined(self.name)
self.dbnd_local_root = self.dbnd_root
except Exception as e:
raise friendly_error.task_build.failed_to_access_dbnd_home(
self.dbnd_root, e
)
if not self.dbnd_data_sync_root:
self.dbnd_data_sync_root = self.dbnd_root.folder("sync")
if self.submit_driver is None:
self.submit_driver = bool(self.remote_engine)
if self.submit_tasks is None:
self.submit_tasks = bool(self.remote_engine)
container_waiting_state = pod_status.container_statuses[0].state.waiting
if pod_status.phase == "Pending" and container_waiting_state:
if container_waiting_state.reason == "ErrImagePull":
logger.info(
"Found problematic condition at %s :%s %s",
self.name,
container_waiting_state.reason,
container_waiting_state.message,
)
raise friendly_error.executor_k8s.kubernetes_image_not_found(
pod_status.container_statuses[0].image,
container_waiting_state.message,
)
if container_waiting_state.reason == "CreateContainerConfigError":
raise friendly_error.executor_k8s.kubernetes_pod_config_error(
container_waiting_state.message
)
task.on_kill()
else:
logger.warning(
"Task is not killed accordingly to user input!"
)
else:
task.on_kill()
except Exception:
logger.exception("Failed to kill task on user keyboard interrupt")
task_run.set_task_run_state(TaskRunState.CANCELLED, error=error)
run._internal_kill()
raise
except SystemExit as ex:
error = TaskRunError.buid_from_ex(ex, task_run)
task_run.set_task_run_state(TaskRunState.CANCELLED, error=error)
raise friendly_error.task_execution.system_exit_at_task_run(task, ex)
except Exception as ex:
error = TaskRunError.buid_from_ex(ex, task_run)
task_run.set_task_run_state(TaskRunState.FAILED, error=error)
show_error_once.set_shown(ex)
raise
finally:
task_run.airflow_context = None