Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def build_http_auth_answer(self, stanza, answer):
if answer == 'yes':
self._log.info('Auth request approved')
confirm = stanza.getTag('confirm')
reply = stanza.buildReply('result')
if stanza.getName() == 'message':
reply.addChild(node=confirm)
self._con.connection.send(reply)
elif answer == 'no':
self._log.info('Auth request denied')
err = nbxmpp.Error(stanza, nbxmpp.protocol.ERR_NOT_AUTHORIZED)
self._con.connection.send(err)
def badRequest(self, stanza):
self.connection.connection.send(nbxmpp.Error(stanza,
nbxmpp.NS_STANZAS + ' bad-request'))
def _CommandExecuteCB(self, con, iq_obj):
jid = helpers.get_full_jid_from_iq(iq_obj)
cmd = iq_obj.getTag('command')
if cmd is None: return
node = cmd.getAttr('node')
if node is None: return
sessionid = cmd.getAttr('sessionid')
if sessionid is None:
# we start a new command session... only if we are visible for the jid
# and command exist
if node not in self.__commands.keys():
self.connection.send(
nbxmpp.Error(iq_obj, nbxmpp.NS_STANZAS + ' item-not-found'))
raise nbxmpp.NodeProcessed
newcmd = self.__commands[node]
if not newcmd.isVisibleFor(self.isSameJID(jid)):
return
# generate new sessionid
sessionid = self.connection.getAnID()
# create new instance and run it
obj = newcmd(conn = self, jid = jid, sessionid = sessionid)
rc = obj.execute(iq_obj)
if rc:
self.__sessions[(jid, sessionid, node)] = obj
raise nbxmpp.NodeProcessed
else:
def __send_error(self, stanza, error, jingle_error=None, text=None, type_=None):
err_stanza = nbxmpp.Error(stanza, '%s %s' % (Namespace.STANZAS, error))
err = err_stanza.getTag('error')
if type_:
err.setAttr('type', type_)
if jingle_error:
err.setTag(jingle_error, namespace=Namespace.JINGLE_ERRORS)
if text:
err.setTagData('text', text)
self.connection.connection.send(err_stanza)
self.__dispatch_error(jingle_error or error, text, type_)
def bad_request(self, stanza):
self.connection.connection.send(
nbxmpp.Error(stanza, Namespace.STANZAS + ' bad-request'))
feature.setNamespace(nbxmpp.NS_FEATURE)
x = nbxmpp.DataForm(typ='submit')
x.addChild(node=nbxmpp.DataField(name='FORM_TYPE',
value='urn:xmpp:ssn'))
x.addChild(node=nbxmpp.DataField(name='accept', value='true'))
x.addChild(node=nbxmpp.DataField(name='logging', value=logging))
self.status = 'responded-archiving'
feature.addChild(node=x)
if not logging:
response = nbxmpp.Error(response, nbxmpp.ERR_NOT_ACCEPTABLE)
feature = nbxmpp.Node(nbxmpp.NS_FEATURE + ' feature')
n = nbxmpp.Node('field')
n['var'] = 'logging'
feature.addChild(node=n)
response.T.error.addChild(node=feature)
self.send(response)
def fail_bad_negotiation(self, reason, fields=None):
"""
Send an error and cancels everything
If fields is None, the remote party has given us a bad cryptographic
value of some kind. Otherwise, list the fields we haven't implemented.
"""
err = nbxmpp.Error(nbxmpp.Message(), nbxmpp.ERR_FEATURE_NOT_IMPLEMENTED)
err.T.error.T.text.setData(reason)
if fields:
feature = nbxmpp.Node(nbxmpp.NS_FEATURE + ' feature')
for field in fields:
fn = nbxmpp.Node('field')
fn['var'] = field
feature.addChild(node=feature)
err.addChild(node=feature)
self.send(err)
self.status = None
self.enable_encryption = False