Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Pyrogram is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrogram. If not, see .
from typing import Union
import pyrogram
from pyrogram.api import functions, types
from ...ext import BaseClient
class GetChat(BaseClient):
def get_chat(
self,
chat_id: Union[int, str]
) -> Union["pyrogram.Chat", "pyrogram.ChatPreview"]:
"""Get up to date information about a chat.
Information include current name of the user for one-on-one conversations, current username of a user, group or
channel, etc.
Parameters:
chat_id (``int`` | ``str``):
Unique identifier (int) or username (str) of the target chat.
Unique identifier for the target chat in form of a *t.me/joinchat/* link, identifier (int) or username
of the target channel/supergroup (in the format @username).
Returns:
@pyrogram.Client.on_message(pyrogram.Filters.command(["rename"]))
async def rename_doc(bot, update):
if update.from_user.id not in Config.AUTH_USERS:
await bot.delete_messages(
chat_id=update.chat.id,
message_ids=update.message_id,
revoke=True
)
return
TRChatBase(update.from_user.id, update.text, "rename")
if (" " in update.text) and (update.reply_to_message is not None):
cmd, file_name = update.text.split(" ", 1)
description = Translation.CUSTOM_CAPTION_UL_FILE
download_location = Config.DOWNLOAD_LOCATION + "/"
a = await bot.send_message(
chat_id=update.chat.id,
text=Translation.DOWNLOAD_START,
@Client.on_message(Filters.command("r", prefix="!") & Filters.reply & ~Filters.edited & Filters.group)
def r(client, message):
if len(message.command) > 1:
colength = len("r") + len("!")
query = str(message.text)[colength:].lstrip()
eventsplit=query.split("/")
result="**You mean:**\n{}".format(message.reply_to_message.text.replace(eventsplit[0],eventsplit[1]))
client.edit_message_text(message.chat.id, message.message_id, result)
def parse_chat(message: types.Message, users: dict, chats: dict) -> pyrogram_types.Chat:
if isinstance(message.to_id, types.PeerUser):
return parse_user_chat(users[message.to_id.user_id if message.out else message.from_id])
elif isinstance(message.to_id, types.PeerChat):
return parse_chat_chat(chats[message.to_id.chat_id])
else:
return parse_channel_chat(chats[message.to_id.channel_id])
def _parse(client, dialog: types.Dialog, messages, users, chats) -> "Dialog":
chat_id = dialog.peer
if isinstance(chat_id, types.PeerUser):
chat_id = chat_id.user_id
elif isinstance(chat_id, types.PeerChat):
chat_id = -chat_id.chat_id
else:
chat_id = int("-100" + str(chat_id.channel_id))
return Dialog(
chat=Chat._parse_dialog(client, dialog.peer, users, chats),
top_message=messages.get(chat_id),
unread_messages_count=dialog.unread_count,
unread_mentions_count=dialog.unread_mentions_count,
unread_mark=dialog.unread_mark,
is_pinned=dialog.pinned,
client=client
)
# Unsave the animation once is sent
app.send_animation("me", "animation.gif", unsave=True)
# Keep track of the progress while uploading
def progress(current, total):
print("{:.1f}%".format(current * 100 / total))
app.send_animation("me", "animation.gif", progress=progress)
"""
file = None
try:
if os.path.exists(animation):
thumb = None if thumb is None else self.save_file(thumb)
file = self.save_file(animation, progress=progress, progress_args=progress_args)
media = types.InputMediaUploadedDocument(
mime_type=self.guess_mime_type(animation) or "video/mp4",
file=file,
thumb=thumb,
attributes=[
types.DocumentAttributeVideo(
supports_streaming=True,
duration=duration,
w=width,
h=height
),
types.DocumentAttributeFilename(file_name=os.path.basename(animation)),
types.DocumentAttributeAnimated()
]
)
elif animation.startswith("http"):
media = types.InputMediaDocumentExternal(
send_polls = None
if can_change_info:
change_info = None
if can_invite_users:
invite_users = None
if can_pin_messages:
pin_messages = None
r = self.send(
functions.channels.EditBanned(
channel=self.resolve_peer(chat_id),
user_id=self.resolve_peer(user_id),
banned_rights=types.ChatBannedRights(
until_date=until_date,
send_messages=send_messages,
send_media=send_media,
send_stickers=send_stickers,
send_gifs=send_gifs,
send_games=send_games,
send_inline=send_inline,
embed_links=embed_links,
send_polls=send_polls,
change_info=change_info,
invite_users=invite_users,
pin_messages=pin_messages
)
)
)
def handle_starttag(self, tag, attrs):
attrs = dict(attrs)
extra = {}
if tag in ["b", "strong"]:
entity = types.MessageEntityBold
elif tag in ["i", "em"]:
entity = types.MessageEntityItalic
elif tag == "u":
entity = types.MessageEntityUnderline
elif tag in ["s", "del", "strike"]:
entity = types.MessageEntityStrike
elif tag == "blockquote":
entity = types.MessageEntityBlockquote
elif tag == "code":
entity = types.MessageEntityCode
elif tag == "pre":
entity = types.MessageEntityPre
extra["language"] = ""
elif tag == "a":
url = attrs.get("href", "")
def _get_sticker_set_name(send, input_sticker_set_id):
try:
return send(
functions.messages.GetStickerSet(
stickerset=types.InputStickerSetID(
id=input_sticker_set_id[0],
access_hash=input_sticker_set_id[1]
)
)
).set.short_name
except StickersetInvalid:
return None
try:
decoded = utils.decode(self.audio_file_id)
fmt = " 24 else "