Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_next():
loop = asyncio.new_event_loop()
def t():
return 1
t = crontab('* * * * * *', func=t, loop=loop)
future = asyncio.ensure_future(t.next(), loop=loop)
loop.run_until_complete(future)
assert future.result() == 1
def test_coro_next_raise():
loop = asyncio.new_event_loop()
@crontab('* * * * * *', loop=loop)
@asyncio.coroutine
def t():
raise CustomError()
future = asyncio.ensure_future(t.next(), loop=loop)
with pytest.raises(CustomError):
loop.run_until_complete(future)
def test_null_callback():
loop = asyncio.new_event_loop()
t = crontab('* * * * * *', loop=loop)
assert t.handle is None # not started
future = asyncio.ensure_future(t.next(4), loop=loop)
loop.run_until_complete(future)
assert future.result() == (4,)
def test_next_raise():
loop = asyncio.new_event_loop()
@crontab('* * * * * *', loop=loop)
def t():
raise CustomError()
future = asyncio.ensure_future(t.next(), loop=loop)
with pytest.raises(CustomError):
loop.run_until_complete(future)
def test_coro_next():
loop = asyncio.new_event_loop()
@crontab('* * * * * *', loop=loop)
@asyncio.coroutine
def t():
return 1
future = asyncio.ensure_future(t.next(), loop=loop)
loop.run_until_complete(future)
assert future.result() == 1