Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_handle_pause(mocker):
mocker.patch("deck_chores.main.reassign_jobs", mocker.Mock(return_value=None))
job = mocker.MagicMock(spec_set=Job)
mocker.patch(
"deck_chores.jobs.get_jobs_for_container", mocker.Mock(return_value=[job])
)
handle_pause({"Actor": {"ID": "a"}})
job.pause.assert_called_once()
def test_resume_job(self, scheduler, freeze_time, dead_job):
next_fire_time = None if dead_job else freeze_time.current + timedelta(seconds=1)
trigger = MagicMock(BaseTrigger, get_next_fire_time=lambda prev, now: next_fire_time)
returned_job = MagicMock(Job, id='foo', trigger=trigger)
scheduler._lookup_job = MagicMock(return_value=(returned_job, 'bar'))
scheduler.modify_job = MagicMock()
scheduler.remove_job = MagicMock()
scheduler.resume_job('foo')
if dead_job:
scheduler.remove_job.assert_called_once_with('foo', 'bar')
else:
scheduler.modify_job.assert_called_once_with('foo', 'bar',
next_run_time=next_fire_time)
def setup(self):
self.trigger_date = datetime(2999, 1, 1)
self.earlier_date = datetime(2998, 12, 31)
self.trigger = DateTrigger({}, self.trigger_date, local_tz)
self.job = Job(self.trigger, dummy_job, [], {}, 1, False, None, None, 1)
self.job.next_run_time = self.trigger_date
def load_jobs(self):
jobs = []
for job_dict in self.table.run(self.conn):
try:
job = Job.__new__(Job)
job_dict['id'] = job_dict.pop('id')
job_dict['trigger'] = (
pickle
.loads(
job_dict['trigger'].decode("base64").decode("zip")
)
)
job_dict['args'] = (
pickle
.loads(
job_dict['args'].decode("base64").decode("zip")
)
)
job_dict['kwargs'] = (
pickle
.loads(
def _reconstitute_job(self, job_state):
job_state = pickle.loads(job_state)
job = Job.__new__(Job)
job.__setstate__(job_state)
job._scheduler = self._scheduler
job._jobstore_alias = self._alias
return job
def _reconstitute_job(self, job_state):
job_state = pickle.loads(job_state)
job = Job.__new__(Job)
job.__setstate__(job_state)
job._scheduler = self._scheduler
job._jobstore_alias = self._alias
return job
def _reconstitute_job(self, job_state):
job_state = pickle.loads(job_state)
job = Job.__new__(Job)
job.__setstate__(job_state)
job._scheduler = self._scheduler
job._jobstore_alias = self._alias
return job
def add_job(self, trigger, func, args, kwargs, jobstore='default',
**options):
"""
Adds the given job to the job list and notifies the scheduler thread.
:param trigger: alias of the job store to store the job in
:param func: callable to run at the given time
:param args: list of positional arguments to call func with
:param kwargs: dict of keyword arguments to call func with
:param jobstore: alias of the job store to store the job in
:rtype: :class:`~apscheduler.job.Job`
"""
job = Job(trigger, func, args or [], kwargs or {},
options.pop('misfire_grace_time', self.misfire_grace_time),
options.pop('coalesce', self.coalesce), **options)
if not self.running:
self._pending_jobs.append((job, jobstore))
logger.info('Adding job tentatively -- it will be properly '
'scheduled when the scheduler starts')
else:
self._real_add_job(job, jobstore, True)
return job