Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
text = text.replace(',hour,', ' hour,')
text = text.replace('day hour,', 'day,hour ')
text = text.replace('hours,', 'hours ')
text = text.replace('hour,', 'hour ')
text = text.replace('working,day', 'working day')
text = text.replace('week,day', 'week day')
text = text.replace(',minutes', ' minutes')
text = text.replace(',minute', ' minute')
text = text.replace(',day', ' day')
text = text.replace(',end', ' end')
text = text.replace(',month,', ' month,')
# get parts separated by comma
parts = [part.strip() for part in text.split(',') if part]
try:
specs = self._parse_parts(parts)
sched = JobSchedule(**specs)
except:
raise ValueError(f'Cannot parse {orig_text}, read as {parts}')
return sched
config_cell = cell
if not config_cell:
return {}
yaml_conf = '\n'.join(
[re.sub('#', '', x, 1) for x in str(
config_cell.source).splitlines()])
try:
yaml_conf = yaml.safe_load(yaml_conf)
config = yaml_conf.get(self._nb_config_magic[0], yaml_conf)
except Exception:
raise ValueError(
'Notebook configuration cannot be parsed')
# translate config to canonical form
# TODO refactor to seperate method / mapped translation functions
if 'schedule' in config:
config['run-at'] = JobSchedule(text=config.get('schedule', '')).cron
if 'cron' in config:
config['run-at'] = JobSchedule.from_cron(config.get('cron')).cron
if 'run-at' in config:
config['run-at'] = JobSchedule.from_cron(config.get('run-at')).cron
return config
def from_text(cls, text):
return JobSchedule(text=text)
def __init__(self, prefix=None, store=None, defaults=None):
self.defaults = defaults or omega_settings()
prefix = prefix or 'jobs'
self.store = store or OmegaStore(prefix=prefix)
self.kind = MDREGISTRY.OMEGAML_JOBS
self._include_dir_placeholder = True
# convenience so you can do om.jobs.schedule(..., run_at=om.jobs.Schedule(....))
self.Schedule = JobSchedule
[re.sub('#', '', x, 1) for x in str(
config_cell.source).splitlines()])
try:
yaml_conf = yaml.safe_load(yaml_conf)
config = yaml_conf.get(self._nb_config_magic[0], yaml_conf)
except Exception:
raise ValueError(
'Notebook configuration cannot be parsed')
# translate config to canonical form
# TODO refactor to seperate method / mapped translation functions
if 'schedule' in config:
config['run-at'] = JobSchedule(text=config.get('schedule', '')).cron
if 'cron' in config:
config['run-at'] = JobSchedule.from_cron(config.get('cron')).cron
if 'run-at' in config:
config['run-at'] = JobSchedule.from_cron(config.get('run-at')).cron
return config