Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def send_cert_request(con, to_jid):
iq = nbxmpp.Iq('get', to=to_jid)
id_ = generate_id()
iq.setAttr('id', id_)
pubkey = iq.setTag('pubkeys')
pubkey.setNamespace(Namespace.PUBKEY_PUBKEY)
con.connection.send(iq)
return str(id_)
'port': port,
'type': 'direct',
'jid': self.ourjid,
'priority': priority
}
hosts.add(my_ip)
local_ip_cand.append(candidate)
try:
for addrinfo in socket.getaddrinfo(socket.gethostname(), None):
addr = addrinfo[4][0]
if not addr in hosts and not addr.startswith('127.') and \
addr != '::1':
candidate = {
'host': addr,
'candidate_id': generate_id(),
'port': port,
'type': 'direct',
'jid': self.ourjid,
'priority': priority,
'initiator': self.file_props.sender,
'target': self.file_props.receiver
}
hosts.add(addr)
local_ip_cand.append(candidate)
except socket.gaierror:
pass # ignore address-related errors for getaddrinfo
try:
from netifaces import interfaces, ifaddresses, AF_INET, AF_INET6
for ifaceName in interfaces():
addresses = ifaddresses(ifaceName)
def _get_query_id(self, jid):
query_id = generate_id()
self._mam_query_ids[jid] = query_id
return query_id
def add_stanza(self, stanza, is_message=False):
if self.Connection:
if self.Connection.state == -1:
return False
self.send(stanza, is_message)
else:
self.stanzaqueue.append((stanza, is_message))
if is_message:
thread_id = stanza.getThread()
id_ = stanza.getID()
if not id_:
id_ = generate_id()
if self.fd in self.conn_holder.ids_of_awaiting_messages:
self.conn_holder.ids_of_awaiting_messages[self.fd].append((id_,
thread_id))
else:
self.conn_holder.ids_of_awaiting_messages[self.fd] = [(id_,
thread_id)]
return True
def set_connection(self, conn):
self.connection = conn
if not self.sid:
self.sid = generate_id()
def build_message_stanza(self, message):
own_jid = self._con.get_own_jid()
stanza = nbxmpp.Message(to=message.jid,
body=message.message,
typ=message.type_,
subject=message.subject,
xhtml=message.xhtml)
if message.correct_id:
stanza.setTag('replace', attrs={'id': message.correct_id},
namespace=Namespace.CORRECT)
# XEP-0359
message.message_id = generate_id()
stanza.setID(message.message_id)
stanza.setOriginID(message.message_id)
if message.label:
stanza.addChild(node=message.label)
# XEP-0172: user_nickname
if message.user_nick:
stanza.setTag('nick', namespace=Namespace.NICK).setData(
message.user_nick)
# XEP-0203
# TODO: Seems delayed is not set anywhere
if message.delayed:
timestamp = time.strftime('%Y-%m-%dT%H:%M:%SZ',
time.gmtime(message.delayed))