Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def parse_chat(message: types.Message, users: dict, chats: dict) -> pyrogram_types.Chat:
if isinstance(message.to_id, types.PeerUser):
return parse_user_chat(users[message.to_id.user_id if message.out else message.from_id])
elif isinstance(message.to_id, types.PeerChat):
return parse_chat_chat(chats[message.to_id.chat_id])
else:
return parse_channel_chat(chats[message.to_id.channel_id])
def _parse(client, dialog: types.Dialog, messages, users, chats) -> "Dialog":
chat_id = dialog.peer
if isinstance(chat_id, types.PeerUser):
chat_id = chat_id.user_id
elif isinstance(chat_id, types.PeerChat):
chat_id = -chat_id.chat_id
else:
chat_id = int("-100" + str(chat_id.channel_id))
return Dialog(
chat=Chat._parse_dialog(client, dialog.peer, users, chats),
top_message=messages.get(chat_id),
unread_messages_count=dialog.unread_count,
unread_mentions_count=dialog.unread_mentions_count,
unread_mark=dialog.unread_mark,
is_pinned=dialog.pinned,
client=client
)
# Unsave the animation once is sent
app.send_animation("me", "animation.gif", unsave=True)
# Keep track of the progress while uploading
def progress(current, total):
print("{:.1f}%".format(current * 100 / total))
app.send_animation("me", "animation.gif", progress=progress)
"""
file = None
try:
if os.path.exists(animation):
thumb = None if thumb is None else self.save_file(thumb)
file = self.save_file(animation, progress=progress, progress_args=progress_args)
media = types.InputMediaUploadedDocument(
mime_type=self.guess_mime_type(animation) or "video/mp4",
file=file,
thumb=thumb,
attributes=[
types.DocumentAttributeVideo(
supports_streaming=True,
duration=duration,
w=width,
h=height
),
types.DocumentAttributeFilename(file_name=os.path.basename(animation)),
types.DocumentAttributeAnimated()
]
)
elif animation.startswith("http"):
media = types.InputMediaDocumentExternal(
send_polls = None
if can_change_info:
change_info = None
if can_invite_users:
invite_users = None
if can_pin_messages:
pin_messages = None
r = self.send(
functions.channels.EditBanned(
channel=self.resolve_peer(chat_id),
user_id=self.resolve_peer(user_id),
banned_rights=types.ChatBannedRights(
until_date=until_date,
send_messages=send_messages,
send_media=send_media,
send_stickers=send_stickers,
send_gifs=send_gifs,
send_games=send_games,
send_inline=send_inline,
embed_links=embed_links,
send_polls=send_polls,
change_info=change_info,
invite_users=invite_users,
pin_messages=pin_messages
)
)
)
def handle_starttag(self, tag, attrs):
attrs = dict(attrs)
extra = {}
if tag in ["b", "strong"]:
entity = types.MessageEntityBold
elif tag in ["i", "em"]:
entity = types.MessageEntityItalic
elif tag == "u":
entity = types.MessageEntityUnderline
elif tag in ["s", "del", "strike"]:
entity = types.MessageEntityStrike
elif tag == "blockquote":
entity = types.MessageEntityBlockquote
elif tag == "code":
entity = types.MessageEntityCode
elif tag == "pre":
entity = types.MessageEntityPre
extra["language"] = ""
elif tag == "a":
url = attrs.get("href", "")
def _get_sticker_set_name(send, input_sticker_set_id):
try:
return send(
functions.messages.GetStickerSet(
stickerset=types.InputStickerSetID(
id=input_sticker_set_id[0],
access_hash=input_sticker_set_id[1]
)
)
).set.short_name
except StickersetInvalid:
return None
try:
decoded = utils.decode(self.audio_file_id)
fmt = " 24 else "
delete (``bool``, *optional*):
Deletes the group chat dialog after leaving (for simple group chats, not supergroups).
Defaults to False.
Example:
.. code-block:: python
# Leave chat or channel
app.leave_chat(chat_id)
# Leave basic chat and also delete the dialog
app.leave_chat(chat_id, delete=True)
"""
peer = self.resolve_peer(chat_id)
if isinstance(peer, types.InputPeerChannel):
return self.send(
functions.channels.LeaveChannel(
channel=self.resolve_peer(chat_id)
)
)
elif isinstance(peer, types.InputPeerChat):
r = self.send(
functions.messages.DeleteChatUser(
chat_id=peer.chat_id,
user_id=types.InputPeerSelf()
)
)
if delete:
self.send(
functions.messages.DeleteHistory(
# Send photo by uploading from URL
app.send_photo("me", "https://i.imgur.com/BQBTP7d.png")
# Add caption to a photo
app.send_photo("me", "photo.jpg", caption="Holidays!")
# Send self-destructing photo
app.send_photo("me", "photo.jpg", ttl_seconds=10)
"""
file = None
try:
if os.path.exists(photo):
file = self.save_file(photo, progress=progress, progress_args=progress_args)
media = types.InputMediaUploadedPhoto(
file=file,
ttl_seconds=ttl_seconds
)
elif photo.startswith("http"):
media = types.InputMediaPhotoExternal(
url=photo,
ttl_seconds=ttl_seconds
)
else:
media = utils.get_input_media_from_file_id(photo, file_ref, 2)
while True:
try:
r = self.send(
functions.messages.SendMedia(
peer=self.resolve_peer(chat_id),
r = self.send(
functions.messages.SendMessage(
peer=self.resolve_peer(chat_id),
no_webpage=disable_web_page_preview or None,
silent=disable_notification or None,
reply_to_msg_id=reply_to_message_id,
random_id=self.rnd_id(),
schedule_date=schedule_date,
reply_markup=reply_markup.write() if reply_markup else None,
message=message,
entities=entities
)
)
if isinstance(r, types.UpdateShortSentMessage):
peer = self.resolve_peer(chat_id)
peer_id = (
peer.user_id
if isinstance(peer, types.InputPeerUser)
else -peer.chat_id
)
return pyrogram.Message(
message_id=r.id,
chat=pyrogram.Chat(
id=peer_id,
type="private",
client=self
),
text=message,