Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@concurrent
async def concurrent_func(self, v):
with open('events', 'a') as f:
f.write('concurrent_func[{}],'.format(v))
async def shutdown(self):
with open('events', 'a') as f:
f.write('shutdown[{}],'.format(self.is_shadow))
class StartupWorker(BaseWorker):
shadows = [DemoActor, StartupActor]
class FastShutdownWorker(BaseWorker):
shadows = []
shutdown_delay = 0.1
class DrainQuit2(Drain):
def _job_callback(self, task):
super()._job_callback(task)
if self.jobs_complete >= 2:
self.running = False
class WorkerQuit(Worker):
"""
worker which stops taking new jobs after 2 jobs
"""
max_concurrent_tasks = 1
from arq import Actor, BaseWorker, concurrent
class ActorTest(Actor):
@concurrent
async def foo(self, a, b=0):
with open('foo', 'w') as f:
r = a + b
f.write('{}'.format(r))
class Worker(BaseWorker):
signature = 'foobar'
shadows = [ActorTest]
class WorkerSignalQuit(Worker):
"""
worker which simulates receiving sigint after 2 jobs
"""
max_concurrent_tasks = 1
async def run_job(self, *args):
await super().run_job(*args)
if self.jobs_complete >= 2:
self.handle_sig(2)
async def startup(self):
with open('events', 'a') as f:
f.write('startup[{}],'.format(self.is_shadow))
@concurrent
async def concurrent_func(self, v):
with open('events', 'a') as f:
f.write('concurrent_func[{}],'.format(v))
async def shutdown(self):
with open('events', 'a') as f:
f.write('shutdown[{}],'.format(self.is_shadow))
class StartupWorker(BaseWorker):
shadows = [DemoActor, StartupActor]
class FastShutdownWorker(BaseWorker):
shadows = []
shutdown_delay = 0.1
class DrainQuit2(Drain):
def _job_callback(self, task):
super()._job_callback(task)
if self.jobs_complete >= 2:
self.running = False
class WorkerQuit(Worker):
class WorkerQuit(Worker):
"""
worker which stops taking new jobs after 2 jobs
"""
max_concurrent_tasks = 1
drain_class = DrainQuit2
class WorkerFail(Worker):
async def run_job(self, j):
raise RuntimeError('foobar')
class MockRedisWorker(MockRedisMixin, BaseWorker):
shadows = [MockRedisDemoActor]
class DrainQuitImmediate(Drain):
def _job_callback(self, task):
super()._job_callback(task)
self.running = False
class MockRedisWorkerQuit(MockRedisWorker):
drain_class = DrainQuitImmediate
class FoobarActor(MockRedisDemoActor):
name = 'foobar'
raise StopJob('stopping job normally')
@concurrent
async def stop_job_warning(self):
raise StopJob('stopping job with warning', warning=True)
class RealJobActor(DemoActor):
job_class = Job
class MockRedisDemoActor(MockRedisMixin, DemoActor):
pass
class Worker(BaseWorker):
shadows = [DemoActor]
class StartupActor(Actor):
job_class = JobConstID
async def startup(self):
with open('events', 'a') as f:
f.write('startup[{}],'.format(self.is_shadow))
@concurrent
async def concurrent_func(self, v):
with open('events', 'a') as f:
f.write('concurrent_func[{}],'.format(v))
async def shutdown(self):
with open('not_unique', 'w') as f:
f.write(f'not_unique the value')
def _now(self):
try:
with open('datatime.pkl', 'rb') as f:
dts = pickle.load(f)
dt = dts.pop(0)
with open('datatime.pkl', 'wb') as f:
pickle.dump(dts, f)
return dt
except FileNotFoundError:
return super()._now()
class CronWorker(BaseWorker):
shadows = [CronActor]
class ParentActor(MockRedisMixin, Actor):
v = 'Parent'
@concurrent
async def save_value(self, file_name):
with open(file_name, 'w') as f:
f.write(self.v)
class ChildActor(ParentActor):
v = 'Child'
class ParentChildActorWorker(MockRedisMixin, BaseWorker):
shadows = [ParentActor, ChildActor]
class ReEnqueueActor(DemoActor):
re_enqueue_jobs = True
class CronActor(Actor):
# using 3:0:0 makes it very unlikely the job will be caused due hitting the right time
job_class = JobConstID
@cron(hour=3, minute=0, second=0, run_at_startup=True)
async def save_foobar(self):
with open('foobar', 'w') as f:
f.write(f'foobar the value')
from arq import BaseWorker, DatetimeJob
from .donorfy import DonorfyActor
from .emails import EmailActor
from .settings import Settings
class Worker(BaseWorker):
job_class = DatetimeJob
shadows = [DonorfyActor, EmailActor]
def __init__(self, **kwargs): # pragma: no cover
self.settings = Settings()
kwargs['redis_settings'] = self.settings.redis_settings
super().__init__(**kwargs)
async def shadow_kwargs(self):
kwargs = await super().shadow_kwargs()
kwargs['settings'] = self.settings
return kwargs
async def shutdown(app):
await app['downloader'].close()
def create_app():
app = web.Application()
app.router.add_get('/', index)
app.router.add_post('/start-job/', start_job)
app['downloader'] = Downloader(redis_settings=redis_settings)
app.on_shutdown.append(shutdown)
session_setup(app, SimpleCookieStorage())
return app
class Worker(BaseWorker):
# used by `arq app.py` command
shadows = [Downloader]
# set to small value so we can play with timeouts
timeout_seconds = 10
def __init__(self, *args, **kwargs):
kwargs['redis_settings'] = redis_settings
super().__init__(*args, **kwargs)
if __name__ == '__main__':
# when called directly run the webserver
app = create_app()
web.run_app(app, port=8000)