Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def spool(self, *args, **kwargs):
arguments = self.base_dict
if len(args) > 0:
arguments.update(args[0])
if kwargs:
arguments.update(kwargs)
return uwsgi.spool(arguments)
def application(env, start_response):
name = uwsgi.spool({
'Hello': 'World',
'I am a': 'long running task'
})
print("spooled as %s" % name)
start_response('200 Ok', [
('Content-Type', 'text/plain'),
('uWSGI-Status', 'spooled'),
])
return "task spooled"
def spool(self, *args, **kwargs):
arguments = self.base_dict
arguments['ud_spool_ret'] = str(uwsgi.SPOOL_RETRY)
if len(args) > 0:
arguments.update(args[0])
if kwargs:
arguments.update(kwargs)
return uwsgi.spool(arguments)
def __call__(self, *args, **kwargs):
arguments = self.base_dict.copy()
if not self.pass_arguments:
if len(args) > 0:
arguments.update(args[0])
if kwargs:
arguments.update(kwargs)
else:
spooler_args = {}
for key in ('message_dict', 'spooler', 'priority', 'at', 'body'):
if key in kwargs:
spooler_args.update({key: kwargs.pop(key)})
arguments.update(spooler_args)
arguments.update(
{'args': pickle.dumps(args), 'kwargs': pickle.dumps(kwargs)})
return uwsgi.spool(_encode_to_spooler(arguments))
def producer():
uwsgi.spool(ud_spool_func="consumer", dest=random.choice(projects))
time.sleep(2)
def spool(self, *args, **kwargs):
arguments = self.base_dict
arguments['ud_spool_ret'] = str(uwsgi.SPOOL_RETRY)
if len(args) > 0:
arguments.update(args[0])
if kwargs:
arguments.update(kwargs)
return uwsgi.spool(arguments)
def spool(self, *args, **kwargs):
arguments = self.base_dict
if len(args) > 0:
arguments.update(args[0])
if kwargs:
arguments.update(kwargs)
return uwsgi.spool(arguments)
def spool(*args, **kwargs):
import uwsgi
uwsgi.spool({
b'f': dumps(f),
b'args': dumps(args),
b'kwargs': dumps(kwargs)
})