Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@wraps(fn)
async def wrapper(self, device, *args, **kwargs):
try:
return await fn(self, device, *args, **kwargs)
except Exception as e:
self._log.error(_('failed to {0} {1}: {2}',
fn.__name__, device, exc_message(e)))
self._log.debug(format_exc())
return False
return wrapper
@wraps(func)
def wrapper(self, object_path, *args, **kwargs):
self._event_queue[object_path].push(func, self, object_path, *args, **kwargs)
return wrapper
@wraps(func)
def runner(*args, **kwargs):
return ensure_future(func(*args, **kwargs))
return runner
@wraps(func)
async def coro(*args, **kwargs):
return func(*args, **kwargs)
return coro