Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except exception as e:
if ignore:
return e
if exception_handler is not None:
await exception_handler(e, *args, **kwargs)
elif just_log:
logger.error(
"While {func} was handling error occurred \n\n{traceback}",
func=func.__name__,
traceback=traceback.format_exc(),
)
finally:
logger.debug(f"{func.__name__} successfully handled with swear")
f"{time.localtime()} - DELAY {delay * 5} sec\n"
f"Check your internet connection. Maybe VK died, request returned: {response}"
f"Error appeared after request: {method}",
)
await asyncio.sleep(delay * 5)
delay += 1
response = await request(*req)
logger.critical(
f"--- {time.strftime('%m-%d %H:%M:%S', time.localtime())}\n"
f"- METHOD SUCCESS after {5 * sum(range(1, delay))} sec\n"
f"RESPONSE: {response}\n",
)
if "error" in response:
logger.debug(
"Error after request {method}, response: {r}", method=method, r=response
)
if throw_errors:
raise VKError(
[response["error"]["error_code"], response["error"]["error_msg"]]
)
return response
return response["response"]
def get_id_by_token(token: str) -> int:
"""
Get group id from token
:param token:
:return:
"""
logger.debug("Making API request users.get to get user_id")
response = asyncio.get_event_loop().run_until_complete(
request("users.get", {}, token)
)
if "error" in response:
raise VKError(0, "Token is invalid")
return response["response"][0]["id"]
def get_id_by_token(token: str, throw_exc: bool = True) -> typing.Union[int, bool]:
"""
Get group id from token
:param token:
:param throw_exc:
:return:
"""
logger.debug("Making API request groups.getById to get group_id")
response = asyncio.get_event_loop().run_until_complete(
request("groups.getById", {}, token)
)
if "error" in response:
if throw_exc:
raise VKError(0, "Token is invalid")
return False
return response["response"][0]["id"]
async def run(self, skip_updates: bool, wait: int = DEFAULT_WAIT):
""" Run bot polling forever
Can be manually stopped with:
bot.stop()
"""
self.__wait = wait
logger.debug("Polling will be started. Is it OK?")
if self.__secret is not None:
logger.warning("You set up secret and started polling. Removing secret")
self.__secret = None
if not self.status.dispatched:
self.middleware.add_middleware(self.on.pre_p)
await self.on.dispatch(self.get_current_rest)
self.status.dispatched = True
if not skip_updates:
await self.get_updates()
await self.get_server()
logger.info("Polling successfully started. Press Ctrl+C to stop it")
while not self._stop:
def _check_secret(self, event: dict, secret: typing.Optional[str] = None):
"""
Match secret code with current secret
:param event:
:return:
"""
if self.__secret or secret:
logger.debug(
"Checking secret for event ({secret})", secret=event.get("secret")
)
return event.get("secret") == (self.__secret or secret)
return True
self,
method: str,
params: dict,
throw_errors: bool = None,
response_model=None,
raw_response: bool = False,
):
response = await request(
method,
params,
await self.token_generator.get_token(method=method, params=params),
request_instance=self,
error_handler=self.error_handler,
)
logger.debug("Response: {}", response)
if not response_model or raw_response:
return response["response"]
return response_model(**response).response
async def dispatch(self, user: AnyUser) -> None:
"""
Concatenate handlers to current user object
:param user:
:return:
"""
self.on.concatenate(user.on)
self.error_handler.handled_errors.update(user.error_handler.handled_errors)
self.middleware.middleware += user.middleware.middleware
self.branch.add_branches(user.branch.branches)
logger.debug("Bot has been successfully dispatched")
async def dispatch(self, bot: AnyBot):
"""
Concatenate handlers to current bot object
:param bot:
:return:
"""
self.on.concatenate(bot.on)
self.error_handler.handled_errors.update(bot.error_handler.handled_errors)
self.middleware.middleware += bot.middleware.middleware
self.branch.add_branches(bot.branch.branches)
logger.debug("Bot has been successfully dispatched")