Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, library, shard_count=-1, prefixes='py.', message_cache_max=2500, **kwargs):
super().__init__()
sniffio.current_async_library_cvar.set(library)
self.token = ''
self.is_bot = True
self._boot_up_time = None
self.running = anyio.create_event()
self.api = HttpClient(self)
self.session = asks.Session() # public session
self.shards = [] if shard_count < 1 else list(range(shard_count))
self.users = Collection(User)
self.guilds = Collection(Guild)
self.channels = Collection(Channel)
self.messages = deque(maxlen=message_cache_max)
self.commands = CommandCollection(self)
self.webhooks = Collection(Webhook, indexor='name')
self.prefixes = prefixes if isinstance(prefixes, list) else [prefixes]
self._nonces = dict()
self.user = None
def claim_worker_thread(backend) -> typing.Generator[Any, None, None]:
module = sys.modules['anyio._backends._' + backend]
_local.current_async_module = module
token = sniffio.current_async_library_cvar.set(backend)
try:
yield
finally:
sniffio.current_async_library_cvar.reset(token)
del _local.current_async_module