Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@concurrent.process
def error_decorated():
raise RuntimeError("BOOM!")@concurrent.process(daemon=False)
def daemon_keyword_decorated():
return multiprocessing.current_process().daemon@concurrent.process
def decorated(argument, keyword_argument=0):
"""A docstring."""
return argument + keyword_argument@concurrent.process(timeout=0.1)
def long_decorated():
time.sleep(10) @concurrent.process(timeout='Foo')
def function():
return@concurrent.process(timeout=0.1)
def sigterm_decorated():
signal.signal(signal.SIGTERM, signal.SIG_IGN)
time.sleep(10)@concurrent.process(name='concurrent_process_name')
def name_keyword_decorated():
return multiprocessing.current_process().name@concurrent.process
def pickling_error_decorated():
event = threading.Event()
return event@concurrent.process
def decorated(argument, keyword_argument=0):
"""A docstring."""
return argument + keyword_argument @concurrent.process
def instmethod(self):
return self.b