Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from pytox import Tox, OperationFailedError
except ImportError:
log.exception("Could not start the tox")
log.fatal("""
If you intend to use the Tox backend please install tox:
pip install PyTox
""")
sys.exit(-1)
TOX_MAX_MESS_LENGTH = 1368
NOT_ADMIN = "You are not recognized as an administrator of this bot"
TOX_TO_ERR_STATUS = {
Tox.USER_STATUS_NONE: ONLINE,
Tox.USER_STATUS_AWAY: AWAY,
Tox.USER_STATUS_BUSY: DND,
}
TOX_GROUP_TO_ERR_STATUS = {
Tox.CHAT_CHANGE_PEER_ADD: ONLINE,
Tox.CHAT_CHANGE_PEER_DEL: AWAY,
Tox.CHAT_CHANGE_PEER_NAME: None,
}
class ToxIdentifier(object):
def __init__(self, client_id=None, group_number=None, friend_group_number=None, username=None):
self._client_id = client_id
self._group_number = group_number
self._friend_group_number = friend_group_number
def serve_forever(self):
self.readline_support()
if not self._rooms:
# artificially join a room if None were specified.
self.query_room('#testroom').join()
if self.demo_mode:
# disable the console logging once it is serving in demo mode.
root = logging.getLogger()
root.removeHandler(console_hdlr)
root.addHandler(logging.NullHandler())
self.connect_callback() # notify that the connection occured
self.callback_presence(Presence(identifier=self.user, status=ONLINE))
self.send_message(Message(INTRO))
try:
while True:
if self._inroom:
frm = TextOccupant(self.user, self.rooms[0])
to = self.rooms[0]
else:
frm = self.user
to = self.bot_identifier
print()
full_msg = ''
while True:
def change_presence(self, status: str = ONLINE, message: str = '') -> None:
# It looks like telegram doesn't supports online presence for privacy reason.
pass
def change_presence(self, status: str = ONLINE, message: str = '') -> None:
pass
)
self._bot.query_room(room).configure()
def invite_in_room(self, room, jids_to_invite):
"""
.. deprecated:: 2.2.0
Use the methods on :class:`XMPPMUCRoom` instead.
"""
warnings.warn(
"Using invite_in_room is deprecated, use invite from the "
"MUCRoom class instead.",
DeprecationWarning,
)
self._bot.query_room(room).invite(jids_to_invite)
XMPP_TO_ERR_STATUS = {'available': ONLINE,
'away': AWAY,
'dnd': DND,
'unavailable': OFFLINE}
class XMPPBackend(ErrBot):
def __init__(self, config):
super(XMPPBackend, self).__init__(config)
identity = config.BOT_IDENTITY
self.jid = identity['username'] # backward compatibility
self.password = identity['password']
self.server = identity.get('server', None)
self.feature = config.__dict__.get('XMPP_FEATURE_MECHANISMS', {})
self.keepalive = config.__dict__.get('XMPP_KEEPALIVE_INTERVAL', None)
def change_presence(self, status: str = ONLINE, message: str = '') -> None:
self.api_call('users.setPresence', data={'presence': 'auto' if status == ONLINE else 'away'})
def change_presence(self, status: str = ONLINE, message: str = '') -> None:
pass
def user_joined_chat(self, event):
log.debug("user_join_chat %s" % event)
idd = self.build_identifier(event['from'].full)
p = Presence(chatroom=idd,
nick=idd.resource,
status=ONLINE)
self.callback_presence(p)
def change_presence(self, status: str = ONLINE, message: str = '') -> None:
if status == ONLINE:
self.conn.away() # cancels the away message
else:
self.conn.away('[%s] %s' % (status, message))