Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def negotiate_archiving(self):
self.negotiated = {}
request = nbxmpp.Message()
feature = request.NT.feature
feature.setNamespace(nbxmpp.NS_FEATURE)
x = nbxmpp.DataForm(typ='form')
x.addChild(node=nbxmpp.DataField(name='FORM_TYPE', value='urn:xmpp:ssn',
typ='hidden'))
x.addChild(node=nbxmpp.DataField(name='accept', value='1',
typ='boolean', required=True))
x.addChild(node=nbxmpp.DataField(name='logging', typ='list-single',
options=self.archiving_logging_preference(), required=True))
x.addChild(node=nbxmpp.DataField(name='disclosure', typ='list-single',
options=['never'], required=True))
x.addChild(node=nbxmpp.DataField(name='security', typ='list-single',
options=['none'], required=True))
feature.addChild(node=x)
self.status = 'requested-archiving'
id_s = self.encrypt(mac_s)
m_s = self.hmac(self.km_s, crypto.encode_mpi(old_c_s) + id_s)
if self.status == 'requested-e2e' and self.sas_algs == 'sas28x5':
# we're alice; check for a retained secret
# if none exists, prompt the user with the SAS
self.sas = crypto.sas_28x5(m_s, self.form_o.encode('utf-8'))
if self.sigmai:
# FIXME save retained secret?
self.check_identity(tuple)
return (nbxmpp.DataField(name='identity',
value=base64.b64encode(id_s).decode('utf-8')),
nbxmpp.DataField(name='mac',
value=base64.b64encode(m_s).decode('utf-8')))
feature = request.NT.feature
feature.setNamespace(nbxmpp.NS_FEATURE)
x = nbxmpp.DataForm(typ='form')
x.addChild(node=nbxmpp.DataField(name='FORM_TYPE', value='urn:xmpp:ssn',
typ='hidden'))
x.addChild(node=nbxmpp.DataField(name='accept', value='1',
typ='boolean', required=True))
# this field is incorrectly called 'otr' in XEPs 0116 and 0217
x.addChild(node=nbxmpp.DataField(name='logging', typ='list-single',
options=self.logging_preference(), required=True))
# unsupported options: 'disabled', 'enabled'
x.addChild(node=nbxmpp.DataField(name='disclosure', typ='list-single',
options=['never'], required=True))
x.addChild(node=nbxmpp.DataField(name='security', typ='list-single',
options=['e2e'], required=True))
x.addChild(node=nbxmpp.DataField(name='crypt_algs', value='aes128-ctr',
typ='hidden'))
x.addChild(node=nbxmpp.DataField(name='hash_algs', value='sha256',
typ='hidden'))
x.addChild(node=nbxmpp.DataField(name='compress', value='none',
typ='hidden'))
# unsupported options: 'iq', 'presence'
x.addChild(node=nbxmpp.DataField(name='stanzas', typ='list-multi',
options=['message']))
x.addChild(node=nbxmpp.DataField(name='init_pubkey', options=['none',
'key', 'hash'], typ='list-single'))
options=self.logging_preference(), required=True))
# unsupported options: 'disabled', 'enabled'
x.addChild(node=nbxmpp.DataField(name='disclosure', typ='list-single',
options=['never'], required=True))
x.addChild(node=nbxmpp.DataField(name='security', typ='list-single',
options=['e2e'], required=True))
x.addChild(node=nbxmpp.DataField(name='crypt_algs', value='aes128-ctr',
typ='hidden'))
x.addChild(node=nbxmpp.DataField(name='hash_algs', value='sha256',
typ='hidden'))
x.addChild(node=nbxmpp.DataField(name='compress', value='none',
typ='hidden'))
# unsupported options: 'iq', 'presence'
x.addChild(node=nbxmpp.DataField(name='stanzas', typ='list-multi',
options=['message']))
x.addChild(node=nbxmpp.DataField(name='init_pubkey', options=['none',
'key', 'hash'], typ='list-single'))
# FIXME store key, use hash
x.addChild(node=nbxmpp.DataField(name='resp_pubkey', options=['none',
'key'], typ='list-single'))
x.addChild(node=nbxmpp.DataField(name='ver', value='1.0', typ='hidden'))
x.addChild(node=nbxmpp.DataField(name='rekey_freq', value='4294967295',
typ='hidden'))
x.addChild(node=nbxmpp.DataField(name='sas_algs', value='sas28x5',
typ='hidden'))
def negotiate_e2e(self, sigmai):
self.negotiated = {}
request = nbxmpp.Message()
feature = request.NT.feature
feature.setNamespace(nbxmpp.NS_FEATURE)
x = nbxmpp.DataForm(typ='form')
x.addChild(node=nbxmpp.DataField(name='FORM_TYPE', value='urn:xmpp:ssn',
typ='hidden'))
x.addChild(node=nbxmpp.DataField(name='accept', value='1',
typ='boolean', required=True))
# this field is incorrectly called 'otr' in XEPs 0116 and 0217
x.addChild(node=nbxmpp.DataField(name='logging', typ='list-single',
options=self.logging_preference(), required=True))
# unsupported options: 'disabled', 'enabled'
x.addChild(node=nbxmpp.DataField(name='disclosure', typ='list-single',
options=['never'], required=True))
x.addChild(node=nbxmpp.DataField(name='security', typ='list-single',
options=['e2e'], required=True))
x.addChild(node=nbxmpp.DataField(name='crypt_algs', value='aes128-ctr',
typ='hidden'))
x.addChild(node=nbxmpp.DataField(name='hash_algs', value='sha256',
typ='hidden'))
x.addChild(node=nbxmpp.DataField(name='compress', value='none',
def respond_archiving(self, form):
field = form.getField('logging')
options = [x[1] for x in field.getOptions()]
values = field.getValues()
logging = self.archiving_logging_preference(options)
self.negotiated['logging'] = logging
response = nbxmpp.Message()
feature = response.NT.feature
feature.setNamespace(nbxmpp.NS_FEATURE)
x = nbxmpp.DataForm(typ='submit')
x.addChild(node=nbxmpp.DataField(name='FORM_TYPE',
value='urn:xmpp:ssn'))
x.addChild(node=nbxmpp.DataField(name='accept', value='true'))
x.addChild(node=nbxmpp.DataField(name='logging', value=logging))
self.status = 'responded-archiving'
feature.addChild(node=x)
if not logging:
response = nbxmpp.Error(response, nbxmpp.ERR_NOT_ACCEPTABLE)
feature = nbxmpp.Node(nbxmpp.NS_FEATURE + ' feature')
n = nbxmpp.Node('field')
n['var'] = 'logging'
def negotiate_archiving(self):
self.negotiated = {}
request = nbxmpp.Message()
feature = request.NT.feature
feature.setNamespace(nbxmpp.NS_FEATURE)
x = nbxmpp.DataForm(typ='form')
x.addChild(node=nbxmpp.DataField(name='FORM_TYPE', value='urn:xmpp:ssn',
typ='hidden'))
x.addChild(node=nbxmpp.DataField(name='accept', value='1',
typ='boolean', required=True))
x.addChild(node=nbxmpp.DataField(name='logging', typ='list-single',
options=self.archiving_logging_preference(), required=True))
x.addChild(node=nbxmpp.DataField(name='disclosure', typ='list-single',
options=['never'], required=True))
x.addChild(node=nbxmpp.DataField(name='security', typ='list-single',
options=['none'], required=True))
feature.addChild(node=x)
self.status = 'requested-archiving'
self.send(request)
negotiated = {}
ask_user = {}
not_acceptable = []
if form['logging'] not in self.archiving_logging_preference():
raise
self.negotiated['logging'] = form['logging']
accept = nbxmpp.Message()
feature = accept.NT.feature
feature.setNamespace(nbxmpp.NS_FEATURE)
result = nbxmpp.DataForm(typ='result')
result.addChild(node=nbxmpp.DataField(name='FORM_TYPE',
value='urn:xmpp:ssn'))
result.addChild(node=nbxmpp.DataField(name='accept', value='1'))
feature.addChild(node=result)
self.send(accept)
if self.negotiated['logging'] == 'mustnot':
self.loggable = False
log.debug('archiving session accepted: %s' % self.loggable)
self.status = 'active'
self.archiving = True
if self.control:
self.control.print_archiving_session_details()
def reject_negotiation(self, body=None):
msg = nbxmpp.Message()
feature = msg.NT.feature
feature.setNamespace(nbxmpp.NS_FEATURE)
x = nbxmpp.DataForm(typ='submit')
x.addChild(node=nbxmpp.DataField(name='FORM_TYPE',
value='urn:xmpp:ssn'))
x.addChild(node=nbxmpp.DataField(name='accept', value='0'))
feature.addChild(node=x)
if body:
msg.setBody(body)
self.send(msg)
self.cancelled_negotiation()
def on_cancel_button_clicked(self, widget):
rejection = nbxmpp.Message(self.jid)
rejection.setThread(self.session.thread_id)
feature = rejection.NT.feature
feature.setNamespace(nbxmpp.NS_FEATURE)
x = nbxmpp.DataForm(typ='submit')
x.addChild(node=nbxmpp.DataField('FORM_TYPE', value='urn:xmpp:ssn'))
x.addChild(node=nbxmpp.DataField('accept', value='false', typ='boolean'))
feature.addChild(node=x)
app.connections[self.account].send_stanza(rejection)
self.window.destroy()