Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@Streamable.streamable
async def unread(self, *args, **kwargs) -> ListingGenerator:
return ListingGenerator(self.reddit, API_PATH["message_unread"], *args, **kwargs)
def wrapper(
_func: SYNC_OR_ASYNC_ITERABLE):
return Streamable(_func, max_wait, attribute_name)
def streamable(cls, func: SYNC_OR_ASYNC_ITERABLE = None, max_wait: int = 16, attribute_name: str = "fullname"):
if func:
return Streamable(func)
else:
def wrapper(
_func: SYNC_OR_ASYNC_ITERABLE):
return Streamable(_func, max_wait, attribute_name)
return wrapper