Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def parse_command(runtime, url):
idx = url.find('#')
if idx > -1:
update_in(runtime, 'spec.image', url[:idx])
url = url[idx + 1 :]
if url:
arg_list = url.split()
update_in(runtime, 'spec.command', arg_list[0])
update_in(runtime, 'spec.args', arg_list[1:])
pod_spec.image_pull_secrets = [
client.V1LocalObjectReference(name=spec.image_pull_secret)
]
pod = client.V1Pod(
metadata=client.V1ObjectMeta(namespace=namespace, labels=pod_labels),
# annotations=meta.annotation),
spec=pod_spec,
)
svc_temp = dask.config.get("kubernetes.scheduler-service-template")
if spec.service_type or spec.node_port:
if spec.node_port:
spec.service_type = 'NodePort'
svc_temp['spec']['ports'][1]['nodePort'] = spec.node_port
update_in(svc_temp, 'spec.type', spec.service_type)
norm_name = normalize_name(meta.name)
dask.config.set(
{
"kubernetes.scheduler-service-template": svc_temp,
'kubernetes.name': 'mlrun-' + norm_name + '-{uuid}',
}
)
cluster = KubeCluster(
pod,
deploy_mode='remote',
namespace=namespace,
scheduler_timeout=spec.scheduler_timeout,
)
def _run(self, runobj: RunObject, execution: MLClientCtx):
if runobj.metadata.iteration:
self.store_run(runobj)
job = deepcopy(_sparkjob_template)
meta = self._get_meta(runobj, True)
pod_labels = deepcopy(meta.labels)
pod_labels['mlrun/job'] = meta.name
update_in(job, 'metadata', meta.to_dict())
update_in(job, 'spec.driver.labels', pod_labels)
update_in(job, 'spec.executor.labels', pod_labels)
update_in(job, 'spec.executor.instances', self.spec.replicas or 1)
if self.spec.image:
update_in(job, 'spec.image', self.spec.image)
update_in(job, 'spec.volumes', self.spec.volumes)
extra_env = {'MLRUN_EXEC_CONFIG': runobj.to_json()}
# if self.spec.rundb:
# extra_env['MLRUN_DBPATH'] = self.spec.rundb
extra_env = [{'name': k, 'value': v} for k, v in extra_env.items()]
update_in(job, 'spec.driver.env', extra_env + self.spec.env)
update_in(job, 'spec.executor.env', extra_env + self.spec.env)
update_in(job, 'spec.driver.volumeMounts', self.spec.volume_mounts)
update_in(job, 'spec.executor.volumeMounts', self.spec.volume_mounts)
update_in(job, 'spec.deps', self.spec.deps)
if 'requests' in self.spec.resources:
code = environ.get('MLRUN_EXEC_CODE')
if get_in(runtime, 'kind', '') == 'dask':
code = get_in(runtime, 'spec.build.functionSourceCode', code)
if from_env and code:
code = b64decode(code).decode('utf-8')
if kfp:
print('code:\n{}\n'.format(code))
with open('main.py', 'w') as fp:
fp.write(code)
url = url or 'main.py'
if url:
update_in(runtime, 'spec.command', url)
if run_args:
update_in(runtime, 'spec.args', list(run_args))
if image:
update_in(runtime, 'spec.image', image)
set_item(runobj.spec, handler, 'handler')
set_item(runobj.spec, param, 'parameters', fill_params(param))
set_item(runobj.spec, hyperparam, 'hyperparams', fill_params(hyperparam))
set_item(runobj.spec, param_file, 'param_file')
set_item(runobj.spec, tuning_strategy, 'tuning_strategy')
set_item(runobj.spec, selector, 'selector')
set_item(runobj.spec, inputs, run_keys.inputs, list2dict(inputs))
set_item(runobj.spec, in_path, run_keys.input_path)
set_item(runobj.spec, out_path, run_keys.output_path)
set_item(runobj.spec, outputs, run_keys.outputs, list(outputs))
set_item(
runobj.spec, secrets, run_keys.secrets, line2keylist(secrets, 'kind', 'source')
)
def with_requests(self, mem=None, cpu=None):
requests = {}
if mem:
requests['memory'] = mem
if cpu:
requests['cpu'] = cpu
update_in(self.spec.resources, 'requests', requests)
def _update_state(self, rundict: dict):
last_state = get_in(rundict, 'status.state', '')
if last_state != 'error':
update_in(rundict, 'status.state', 'completed')
self._store_run_dict(rundict)
return rundict
if _k8s:
pods = _k8s.get_logger_pods(uid)
if pods:
pod, new_status = list(pods.items())[0]
new_status = new_status.lower()
# TODO: handle in cron/tracking
if new_status != 'pending':
resp = _k8s.logs(pod)
if resp:
out = resp.encode()[offset:]
if status == 'running':
now = str(datetime.now())
update_in(data, 'status.last_update', now)
if new_status == 'failed':
update_in(data, 'status.state', 'error')
update_in(
data, 'status.error', 'error, check logs')
_db.store_run(data, uid, project)
if new_status == 'succeeded':
update_in(data, 'status.state', 'completed')
_db.store_run(data, uid, project)
status = new_status
else:
out = resp or out
return Response(out, mimetype='text/plain',
headers={"pod_status": status})
update_in(job, 'spec.executor.labels', pod_labels)
update_in(job, 'spec.executor.instances', self.spec.replicas or 1)
if self.spec.image:
update_in(job, 'spec.image', self.spec.image)
update_in(job, 'spec.volumes', self.spec.volumes)
extra_env = {'MLRUN_EXEC_CONFIG': runobj.to_json()}
# if self.spec.rundb:
# extra_env['MLRUN_DBPATH'] = self.spec.rundb
extra_env = [{'name': k, 'value': v} for k, v in extra_env.items()]
update_in(job, 'spec.driver.env', extra_env + self.spec.env)
update_in(job, 'spec.executor.env', extra_env + self.spec.env)
update_in(job, 'spec.driver.volumeMounts', self.spec.volume_mounts)
update_in(job, 'spec.executor.volumeMounts', self.spec.volume_mounts)
update_in(job, 'spec.deps', self.spec.deps)
if 'requests' in self.spec.resources:
if 'cpu' in self.spec.resources['requests']:
update_in(job, 'spec.executor.cores', self.spec.resources['requests']['cpu'])
if 'limits' in self.spec.resources:
if 'cpu' in self.spec.resources['limits']:
update_in(job, 'spec.executor.coreLimit', self.spec.resources['limits']['cpu'])
if 'memory' in self.spec.resources['limits']:
update_in(job, 'spec.executor.memory', self.spec.resources['limits']['memory'])
if self.spec.command:
update_in(job, 'spec.mainApplicationFile', self.spec.command)
update_in(job, 'spec.arguments', self.spec.args)
resp = self._submit_job(job, meta.namespace)
name = get_in(resp, 'metadata.name', 'unknown')
state = get_in(resp, 'status.applicationState.state','SUBMITTED')
logger.info('SparkJob {} state={}'.format(meta.name, 'STARTING'))
# clear tag from object in case another function will "take" that tag
update_in(function, "metadata.tag", "")
# versioned means whether we want to version this function object so that it will queryable by its hash key
# to enable that we set the uid to the hash key so it will have a unique record (Unique constraint of function
# is the set (project, name, uid))
# when it's not enabled it means we want to have one unique function object for the set (project, name, tag)
# that will be reused on every store function (cause we don't want to version each version e.g. create a new
# record) so we set the uid to be unversioned-{tag}
if versioned:
uid = hash_key
else:
uid = f'unversioned-{tag}'
updated = datetime.now(timezone.utc)
update_in(function, "metadata.updated", updated)
fn = self._get_function(session, name, project, uid)
if not fn:
fn = Function(
name=name,
project=project,
uid=uid,
)
fn.updated = updated
labels = get_in(function, "metadata.labels", {})
update_labels(fn, labels)
fn.struct = function
self._upsert(session, fn)
self.tag_objects_v2(session, [fn], project, tag)
return hash_key