Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _loop(self):
while True:
now = datetime.now()
logger.info('scheduler loop at %s', now)
with self.lock:
for job in self:
if job.next <= now:
logger.info('scheduling job')
self.pool.submit(job.runtime.run, *job.args, **job.kw)
job.advance()
sleep(self.sleep_time_sec)
async def store_run(
request: Request,
project: str,
uid: str,
iter: int = 0,
db_session: Session = Depends(deps.get_db_session)):
data = None
try:
data = await request.json()
except ValueError:
log_and_raise(HTTPStatus.BAD_REQUEST, reason="bad JSON body")
logger.debug(data)
await run_in_threadpool(get_db().store_run, db_session, data, uid, project, iter=iter)
logger.info("store run: {}".format(data))
return {}
if skip_deployed and self.is_deployed:
self.status.state = 'ready'
return True
build = self.spec.build
if not build.source and not build.commands and not with_mlrun:
if not self.spec.image:
raise ValueError(
'noting to build and image is not specified, '
'please set the function image or build args'
)
self.status.state = 'ready'
return True
if not build.source and not build.commands and with_mlrun:
logger.info(
'running build to add mlrun package, set '
'with_mlrun=False to skip if its already in the image'
)
self.spec.build.image = self.spec.build.image or default_image_name(self)
self.status.state = ''
if self._is_remote_api() and not is_kfp:
db = self._get_db()
logger.info(
'starting remote build, image: {}'.format(self.spec.build.image)
)
data = db.remote_builder(self, with_mlrun)
self.status = data['data'].get('status', None)
self.spec.image = get_in(data, 'data.spec.image')
ready = data.get('ready', False)
def get_run_status(
self,
workflow_id,
timeout=60 * 60,
expected_statuses=None,
notifiers: RunNotifications = None,
):
status = ''
if timeout:
logger.info('waiting for pipeline run completion')
run_info = wait_for_pipeline_completion(
workflow_id, timeout=timeout, expected_statuses=expected_statuses
)
if run_info:
status = run_info['run'].get('status')
mldb = get_run_db().connect(self._secrets)
runs = mldb.list_runs(project=self.name, labels=f'workflow={workflow_id}')
had_errors = 0
for r in runs:
if r['status'].get('state', '') == 'error':
had_errors += 1
text = f'Workflow {workflow_id} finished'
if had_errors:
async def _periodic_function_wrapper(interval: int, function, *args, **kwargs):
while True:
try:
if asyncio.iscoroutinefunction(function):
await function(*args, **kwargs)
else:
await run_in_threadpool(function, *args, **kwargs)
except Exception:
logger.warning(
f'Failed during periodic function execution: {function.__name__}, exc: {traceback.format_exc()}'
)
await asyncio.sleep(interval)
def _initialize_db():
global db
if config.httpdb.db_type == "filedb":
logger.info("using FileRunDB")
db = FileDB(config.httpdb.dirpath)
db.initialize(None)
else:
logger.info("using SQLDB")
db = SQLDB(config.httpdb.dsn)
db_session = None
try:
db_session = create_session()
db.initialize(db_session)
finally:
db_session.close()
last_err = None
if task_generator:
# multiple runs (based on hyper params or params file)
generator = task_generator.generate(runspec)
results = self._run_many(generator, execution, runspec)
results_to_iter(results, runspec, execution)
result = execution.to_dict()
else:
# single run
try:
resp = self._run(runspec, execution)
if watch and self.kind not in ['', 'handler', 'local']:
state = runspec.logs(True, self._get_db())
if state != 'succeeded':
logger.warning('run ended with state {}'.format(state))
result = self._post_run(resp, task=runspec)
except RunError as err:
last_err = err
result = self._post_run(task=runspec, err=err)
return self._wrap_result(result, runspec, err=last_err)
def _schedule(task: Task, delay_seconds):
while True:
start = monotonic()
try:
try:
task.run()
except Exception as err:
logger.exception('task error - %s', err)
except Exception:
pass
duration = monotonic() - start
sleep_time = max(delay_seconds - duration, 0)
sleep(sleep_time)
arguments_data = request.headers.get("pipeline-arguments")
if arguments_data:
arguments = ast.literal_eval(arguments_data)
logger.info("pipeline arguments {}".format(arguments_data))
ctype = request.headers.get("content-type", "")
if "/yaml" in ctype:
ctype = ".yaml"
elif " /zip" in ctype:
ctype = ".zip"
else:
log_and_raise(
HTTPStatus.BAD_REQUEST, reason="unsupported pipeline type {}".format(ctype)
)
logger.info("writing file {}".format(ctype))
print(str(data))
pipe_tmp = tempfile.mktemp(suffix=ctype)
with open(pipe_tmp, "wb") as fp:
fp.write(data)
run = None
try:
client = kfclient(namespace=namespace)
experiment = client.create_experiment(name=experiment_name)
run = client.run_pipeline(experiment.id, run_name, pipe_tmp, params=arguments)
except Exception as e:
remove(pipe_tmp)
log_and_raise(HTTPStatus.BAD_REQUEST, reason="kfp err: {}".format(e))
remove(pipe_tmp)