How to use APScheduler - 10 common examples

To help you get started, we’ve selected a few APScheduler examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github agronholm / apscheduler / tests / test_triggers.py View on Github external
def test_cron_trigger_1(self, timezone):
        trigger = CronTrigger(year='2009/2', month='1/3', day='5-13', timezone=timezone)
        assert repr(trigger) == ("")
        assert str(trigger) == "cron[year='2009/2', month='1/3', day='5-13']"
        start_date = timezone.localize(datetime(2008, 12, 1))
        correct_next_date = timezone.localize(datetime(2009, 1, 5))
        assert trigger.get_next_fire_time(None, start_date) == correct_next_date
github agronholm / apscheduler / tests / testtriggers.py View on Github external
def test_cron_year_list(self):
        trigger = CronTrigger(self.defaults, year='2009,2008')
        eq_(repr(trigger), "")
        eq_(str(trigger), "cron[year='2009,2008']")
        start_date = datetime(2009, 1, 1, tzinfo=local_tz)
        correct_next_date = datetime(2009, 1, 1, tzinfo=local_tz)
        eq_(trigger.get_next_fire_time(start_date), correct_next_date)
github agronholm / apscheduler / tests / test_triggers.py View on Github external
def test_from_crontab(self, expr, expected_repr, timezone):
        trigger = CronTrigger.from_crontab(expr, timezone)
        assert repr(trigger) == expected_repr
github funkyfuture / deck-chores / tests / test_main.py View on Github external
def test_handle_pause(mocker):
    mocker.patch("deck_chores.main.reassign_jobs", mocker.Mock(return_value=None))
    job = mocker.MagicMock(spec_set=Job)
    mocker.patch(
        "deck_chores.jobs.get_jobs_for_container", mocker.Mock(return_value=[job])
    )

    handle_pause({"Actor": {"ID": "a"}})

    job.pause.assert_called_once()
github agronholm / apscheduler / tests / test_schedulers.py View on Github external
def test_resume_job(self, scheduler, freeze_time, dead_job):
        next_fire_time = None if dead_job else freeze_time.current + timedelta(seconds=1)
        trigger = MagicMock(BaseTrigger, get_next_fire_time=lambda prev, now: next_fire_time)
        returned_job = MagicMock(Job, id='foo', trigger=trigger)
        scheduler._lookup_job = MagicMock(return_value=(returned_job, 'bar'))
        scheduler.modify_job = MagicMock()
        scheduler.remove_job = MagicMock()
        scheduler.resume_job('foo')

        if dead_job:
            scheduler.remove_job.assert_called_once_with('foo', 'bar')
        else:
            scheduler.modify_job.assert_called_once_with('foo', 'bar',
                                                         next_run_time=next_fire_time)
github agronholm / apscheduler / tests / testjobstores.py View on Github external
def setup(self):
        self.trigger_date = datetime(2999, 1, 1)
        self.earlier_date = datetime(2998, 12, 31)
        self.trigger = DateTrigger({}, self.trigger_date, local_tz)
        self.job = Job(self.trigger, dummy_job, [], {}, 1, False, None, None, 1)
        self.job.next_run_time = self.trigger_date
github agronholm / apscheduler / tests / test_schedulers.py View on Github external
def test_get_job_nonexistent_job(self, scheduler):
        scheduler._lookup_job = MagicMock(side_effect=JobLookupError('foo'))
        assert scheduler.get_job('foo') is None
github agronholm / apscheduler / tests / test_triggers.py View on Github external
def test_jitter_produces_different_valid_results(self, timezone):
        trigger = IntervalTrigger(seconds=5, timezone=timezone, jitter=3)
        now = datetime.now(timezone)

        results = set()
        for _ in range(0, 100):
            next_fire_time = trigger.get_next_fire_time(None, now)
            results.add(next_fire_time)
            assert timedelta(seconds=2) <= (next_fire_time - now) <= timedelta(seconds=8)
        assert 1 < len(results)
github agronholm / apscheduler / tests / test_triggers.py View on Github external
def test_dst_change(self, is_dst):
        """
        Test that DateTrigger works during the ambiguous "fall-back" DST period.

        Note that you should explicitly compare datetimes as strings to avoid the internal datetime
        comparison which would test for equality in the UTC timezone.

        """
        eastern = pytz.timezone('US/Eastern')
        run_date = eastern.localize(datetime(2013, 10, 3, 1, 5), is_dst=is_dst)

        fire_date = eastern.normalize(run_date + timedelta(minutes=55))
        trigger = DateTrigger(run_date=fire_date, timezone=eastern)
        assert str(trigger.get_next_fire_time(None, fire_date)) == str(fire_date)
github agronholm / apscheduler / tests / testjob.py View on Github external
def test_jobs_equal(self):
        assert self.job == self.job

        job2 = Job(DateTrigger(defaults, self.RUNTIME), lambda: None, [], {}, 1, False, None, None, 1)
        assert self.job != job2

        job2.id = self.job.id = 123
        eq_(self.job, job2)

        assert self.job != 'bleh'