Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from arq import Actor, BaseWorker, concurrent
class ActorTest(Actor):
@concurrent
async def foo(self, a, b=0):
with open('foo', 'w') as f:
r = a + b
f.write('{}'.format(r))
class Worker(BaseWorker):
signature = 'foobar'
shadows = [ActorTest]
class WorkerSignalQuit(Worker):
"""
worker which simulates receiving sigint after 2 jobs
"""
@concurrent
async def concurrent_func(self, v):
with open('events', 'a') as f:
f.write('concurrent_func[{}],'.format(v))
async def shutdown(self):
with open('events', 'a') as f:
f.write('shutdown[{}],'.format(self.is_shadow))
class StartupWorker(BaseWorker):
shadows = [DemoActor, StartupActor]
class FastShutdownWorker(BaseWorker):
shadows = []
shutdown_delay = 0.1
class DrainQuit2(Drain):
def _job_callback(self, task):
super()._job_callback(task)
if self.jobs_complete >= 2:
self.running = False
class WorkerQuit(Worker):
"""
worker which stops taking new jobs after 2 jobs
"""
max_concurrent_tasks = 1
from arq import Actor, BaseWorker, concurrent
class ActorTest(Actor):
@concurrent
async def foo(self, a, b=0):
with open('foo', 'w') as f:
r = a + b
f.write('{}'.format(r))
class Worker(BaseWorker):
signature = 'foobar'
shadows = [ActorTest]
class WorkerSignalQuit(Worker):
"""
worker which simulates receiving sigint after 2 jobs
"""
max_concurrent_tasks = 1
async def run_job(self, *args):
await super().run_job(*args)
if self.jobs_complete >= 2:
self.handle_sig(2)
async def startup(self):
with open('events', 'a') as f:
f.write('startup[{}],'.format(self.is_shadow))
@concurrent
async def concurrent_func(self, v):
with open('events', 'a') as f:
f.write('concurrent_func[{}],'.format(v))
async def shutdown(self):
with open('events', 'a') as f:
f.write('shutdown[{}],'.format(self.is_shadow))
class StartupWorker(BaseWorker):
shadows = [DemoActor, StartupActor]
class FastShutdownWorker(BaseWorker):
shadows = []
shutdown_delay = 0.1
class DrainQuit2(Drain):
def _job_callback(self, task):
super()._job_callback(task)
if self.jobs_complete >= 2:
self.running = False
class WorkerQuit(Worker):
class WorkerQuit(Worker):
"""
worker which stops taking new jobs after 2 jobs
"""
max_concurrent_tasks = 1
drain_class = DrainQuit2
class WorkerFail(Worker):
async def run_job(self, j):
raise RuntimeError('foobar')
class MockRedisWorker(MockRedisMixin, BaseWorker):
shadows = [MockRedisDemoActor]
class DrainQuitImmediate(Drain):
def _job_callback(self, task):
super()._job_callback(task)
self.running = False
class MockRedisWorkerQuit(MockRedisWorker):
drain_class = DrainQuitImmediate
class FoobarActor(MockRedisDemoActor):
name = 'foobar'
tx_rxtestproxyqueue.put(data)
def rx_receiveroutine():
if rx_testproxyqueue.empty():
return None
else:
return rx_testproxyqueue.get_nowait()
####################################
## Receive
##################################
# Create object
testrxsm = arq.ReceiveArqStateMachine(rx_transmitroutine, rx_receiveroutine)
# Set state machine to START
testrxsm.updatestate(arq.STATE_START)
####################################
## Transmit
##################################
# Create object
testtxsm = arq.TransmitArqStateMachine(tx_transmitroutine, tx_receiveroutine)
####################################
## Operations
##################################
print "Sleeping prior to transmit"
listdata = ['this ', 'is', ' a', ' test', '.']
def transmitroutine(data):
print "Transmitting: ", data
def receiveroutine():
if testrxproxyqueue.empty():
return None
else:
return testrxproxyqueue.get_nowait()
# Create object
testrxsm = arq.ReceiveArqStateMachine(transmitroutine, receiveroutine)
# Set state machine to START
print "Updating to START State"
testrxsm.updatestate(arq.STATE_START)
# Add data to Fake RX Proxy queue
testrxproxyqueue.put_nowait("This")
time.sleep(2)
testrxproxyqueue.put_nowait("is")
time.sleep(1)
testrxproxyqueue.put_nowait("a")
time.sleep(3)
testrxproxyqueue.put_nowait("test")
time.sleep(1)
testrxproxyqueue.put_nowait(".")
async def test_repeat_job_result(arq_redis: ArqRedis, worker):
j1 = await arq_redis.enqueue_job('foobar', _job_id='job_id')
assert isinstance(j1, Job)
assert await j1.status() == JobStatus.queued
assert await arq_redis.enqueue_job('foobar', _job_id='job_id') is None
await worker(functions=[foobar]).run_check()
assert await j1.status() == JobStatus.complete
assert await arq_redis.enqueue_job('foobar', _job_id='job_id') is None
async def test_result_timeout(arq_redis: ArqRedis):
j = Job('foobar', arq_redis)
with pytest.raises(asyncio.TimeoutError):
await j.result(0.1, pole_delay=0)
async def test_repeat_job(arq_redis: ArqRedis):
j1 = await arq_redis.enqueue_job('foobar', _job_id='job_id')
assert isinstance(j1, Job)
j2 = await arq_redis.enqueue_job('foobar', _job_id='job_id')
assert j2 is None