Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _delete_crd_resources(
self,
db: DBInterface,
db_session: Session,
namespace: str,
label_selector: str = None,
force: bool = False,
):
k8s_helper = get_k8s_helper()
crd_group, crd_version, crd_plural = self._get_crd_info()
try:
crd_objects = k8s_helper.crdapi.list_namespaced_custom_object(
crd_group,
crd_version,
namespace,
crd_plural,
label_selector=label_selector,
)
except ApiException as e:
# ignore error if crd is not defined
if e.status != 404:
raise
else:
for crd_object in crd_objects['items']:
# best effort - don't let one failure in pod deletion to cut the whole operation
def _delete_pod(namespace, pod):
k8s_helper = get_k8s_helper()
try:
k8s_helper.v1api.delete_namespaced_pod(pod.metadata.name, namespace)
logger.info(f"Deleted pod: {pod.metadata.name}")
except ApiException as e:
# ignore error if pod is already removed
if e.status != 404:
raise
def _delete_pod_resources(
self,
db: DBInterface,
db_session: Session,
namespace: str,
label_selector: str = None,
force: bool = False,
):
k8s_helper = get_k8s_helper()
pods = k8s_helper.v1api.list_namespaced_pod(
namespace, label_selector=label_selector
)
for pod in pods.items:
# best effort - don't let one failure in pod deletion to cut the whole operation
try:
if force:
self._delete_pod(namespace, pod)
continue
# it is less likely that there will be new stable states, or the existing ones will change so better to
# resolve whether it's a transient state by checking if it's not a stable state
in_transient_state = self._is_pod_in_transient_state(
db, db_session, pod
)
def list_resources(self, label_selector: str = None) -> Dict:
k8s_helper = get_k8s_helper()
namespace = k8s_helper.resolve_namespace()
label_selector = self._resolve_label_selector(label_selector)
pod_resources = self._list_pod_resources(namespace, label_selector)
crd_resources = self._list_crd_resources(namespace, label_selector)
response = self._build_list_resources_response(pod_resources, crd_resources)
response = self._enrich_list_resources_response(
response, namespace, label_selector
)
return response
def _delete_resources(
self,
db: DBInterface,
db_session: Session,
namespace: str,
label_selector: str = None,
force: bool = False,
):
"""
Handling services deletion
"""
k8s_helper = get_k8s_helper()
pods = k8s_helper.v1api.list_namespaced_pod(
namespace, label_selector=label_selector
)
service_names = []
for pod in pods.items:
in_transient_phase = pod.status.phase not in PodPhases.stable_phases()
if not in_transient_phase or (force and in_transient_phase):
comp = pod.metadata.labels.get('dask.org/component')
if comp == 'scheduler':
service_names.append(
pod.metadata.labels.get('dask.org/cluster-name')
)
services = k8s_helper.v1api.list_namespaced_service(
namespace, label_selector=label_selector
)
run_id, timeout=60 * 60, expected_statuses: typing.List[str] = None, namespace=None
):
"""Wait for Pipeline status, timeout in sec
:param run_id: id of pipelines run
:param timeout: wait timeout in sec
:param expected_statuses: list of expected statuses, one of [ Succeeded | Failed | Skipped | Error ], by default
[ Succeeded ]
:param namespace: k8s namespace if not default
:return kfp run dict
"""
if expected_statuses is None:
expected_statuses = [RunStatuses.succeeded]
namespace = namespace or mlconf.namespace
remote = not get_k8s_helper(silent=True).is_running_inside_kubernetes_cluster()
logger.debug(
f"Waiting for run completion."
f" run_id: {run_id},"
f" expected_statuses: {expected_statuses},"
f" timeout: {timeout},"
f" remote: {remote},"
f" namespace: {namespace}"
)
if remote:
mldb = get_run_db().connect()
def get_pipeline_if_completed(run_id, namespace=namespace):
resp = mldb.get_pipeline(run_id, namespace=namespace)
status = resp['run']['status']
if status not in RunStatuses.stable_statuses():
def _list_crd_resources(self, namespace: str, label_selector: str = None) -> List:
k8s_helper = get_k8s_helper()
crd_group, crd_version, crd_plural = self._get_crd_info()
crd_resources = None
if crd_group and crd_version and crd_plural:
try:
crd_objects = k8s_helper.crdapi.list_namespaced_custom_object(
crd_group,
crd_version,
namespace,
crd_plural,
label_selector=label_selector,
)
except ApiException as e:
# ignore error if crd is not defined
if e.status != 404:
raise
else:
context,
dest,
dockertext=dock,
inline_code=inline_code,
inline_path=inline_path,
requirements=requirements_list,
secret_name=secret_name,
name=name,
verbose=verbose,
)
if to_mount:
# todo: support different mounters
kpod.mount_v3io(remote=source, mount_path='/context')
k8s = get_k8s_helper()
kpod.namespace = k8s.resolve_namespace(namespace)
if interactive:
return k8s.run_job(kpod)
else:
pod, ns = k8s.create_pod(kpod)
logger.info(
'started build, to watch build logs use "mlrun watch {} {}"'.format(pod, ns)
)
return 'build:{}'.format(pod)
def get_obj_status(selector=[], namespace=None):
k8s = get_k8s_helper()
namespace = namespace or config.namespace
selector = ','.join(['dask.org/component=scheduler'] + selector)
pods = k8s.list_pods(namespace, selector=selector)
status = ''
for pod in pods:
status = pod.status.phase.lower()
print(pod)
if status == 'running':
cluster = pod.metadata.labels.get('dask.org/cluster-name')
logger.info(
'found running dask function {}, cluster={}'.format(
pod.metadata.name, cluster
)
)
return status
logger.info(