Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
auth_key = base64.b64encode(client.auth_key).decode()
auth_key = [auth_key[i: i + 43] for i in range(0, len(auth_key), 43)]
data = dict(
dc_id=client.dc_id,
test_mode=client.test_mode,
auth_key=auth_key,
user_id=client.user_id,
date=int(time.time()),
is_bot=bool(client.is_bot),
peers_by_id={
k: getattr(v, "access_hash", None)
for k, v in client.peers_by_id.copy().items()
},
peers_by_username={
k: utils.get_peer_id(v)
for k, v in client.peers_by_username.copy().items()
},
peers_by_phone={
k: utils.get_peer_id(v)
for k, v in client.peers_by_phone.copy().items()
}
)
os.makedirs(client.workdir, exist_ok=True)
with open(temporary, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4)
f.flush()
os.fsync(f.fileno())
except Exception as e:
thumb = None if thumb is None else self.save_file(thumb)
file = self.save_file(document, progress=progress, progress_args=progress_args)
media = types.InputMediaUploadedDocument(
mime_type=self.guess_mime_type(document) or "application/zip",
file=file,
thumb=thumb,
attributes=[
types.DocumentAttributeFilename(file_name=os.path.basename(document))
]
)
elif document.startswith("http"):
media = types.InputMediaDocumentExternal(
url=document
)
else:
media = utils.get_input_media_from_file_id(document, file_ref, 5)
while True:
try:
r = self.send(
functions.messages.SendMedia(
peer=self.resolve_peer(chat_id),
media=media,
silent=disable_notification or None,
reply_to_msg_id=reply_to_message_id,
random_id=self.rnd_id(),
schedule_date=schedule_date,
reply_markup=reply_markup.write() if reply_markup else None,
**self.parser.parse(caption, parse_mode)
)
)
except FilePartMissing as e:
peer=self.resolve_peer(chat_id),
media=types.InputMediaPhotoExternal(
url=i.media
)
)
)
media = types.InputMediaPhoto(
id=types.InputPhoto(
id=media.photo.id,
access_hash=media.photo.access_hash,
file_reference=media.photo.file_reference
)
)
else:
media = utils.get_input_media_from_file_id(i.media, i.file_ref, 2)
elif isinstance(i, pyrogram.InputMediaVideo):
if os.path.exists(i.media):
while True:
try:
media = self.send(
functions.messages.UploadMedia(
peer=self.resolve_peer(chat_id),
media=types.InputMediaUploadedDocument(
file=self.save_file(i.media),
thumb=None if i.thumb is None else self.save_file(i.thumb),
mime_type=self.guess_mime_type(i.media) or "video/mp4",
attributes=[
types.DocumentAttributeVideo(
supports_streaming=i.supports_streaming or None,
duration=i.duration,
w=i.width,
date = getattr(media, "date", None)
file_ref = getattr(media, "file_ref", None)
data = FileData(
file_name=media_file_name,
file_size=file_size,
mime_type=mime_type,
date=date,
file_ref=file_ref
)
def get_existing_attributes() -> dict:
return dict(filter(lambda x: x[1] is not None, data.__dict__.items()))
try:
decoded = utils.decode_file_id(file_id_str)
media_type = decoded[0]
if media_type == 1:
unpacked = struct.unpack("
dc_id=client.dc_id,
test_mode=client.test_mode,
auth_key=auth_key,
user_id=client.user_id,
date=int(time.time()),
is_bot=bool(client.is_bot),
peers_by_id={
k: getattr(v, "access_hash", None)
for k, v in client.peers_by_id.copy().items()
},
peers_by_username={
k: utils.get_peer_id(v)
for k, v in client.peers_by_username.copy().items()
},
peers_by_phone={
k: utils.get_peer_id(v)
for k, v in client.peers_by_phone.copy().items()
}
)
os.makedirs(client.workdir, exist_ok=True)
with open(temporary, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4)
f.flush()
os.fsync(f.fileno())
except Exception as e:
log.critical(e, exc_info=True)
else:
shutil.move(temporary, persistent)
log.info("Synced {}".format(client.session_name))
) or getattr(update, "channel_id", None)
pts = getattr(update, "pts", None)
pts_count = getattr(update, "pts_count", None)
if isinstance(update, types.UpdateChannelTooLong):
log.warning(update)
if isinstance(update, types.UpdateNewChannelMessage) and is_min:
message = update.message
if not isinstance(message, types.MessageEmpty):
try:
diff = self.send(
functions.updates.GetChannelDifference(
channel=self.resolve_peer(utils.get_channel_id(channel_id)),
filter=types.ChannelMessagesFilter(
ranges=[types.MessageRange(
min_id=update.message.id,
max_id=update.message.id
)]
),
pts=pts - pts_count,
limit=pts
)
)
except ChannelPrivate:
pass
else:
if not isinstance(diff, types.updates.ChannelDifferenceEmpty):
users.update({u.id: u for u in diff.users})
chats.update({c.id: c for c in diff.chats})
peer=self.resolve_peer(chat_id),
no_webpage=disable_web_page_preview or None,
silent=disable_notification or None,
reply_to_msg_id=reply_to_message_id,
random_id=self.rnd_id(),
reply_markup=reply_markup.write() if reply_markup else None,
**style.parse(text)
)
)
if isinstance(r, types.UpdateShortSentMessage):
return pyrogram_types.Message(
message_id=r.id,
date=r.date,
outgoing=r.out,
entities=utils.parse_entities(r.entities, {}) or None
)
for i in r.updates:
if isinstance(i, (types.UpdateNewMessage, types.UpdateNewChannelMessage)):
return utils.parse_messages(
self, i.message,
{i.id: i for i in r.users},
{i.id: i for i in r.chats}
)