Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_archive_query(self, query_id, jid=None, start=None, end=None, with_=None,
after=None, max_=30):
# Muc archive query?
namespace = muc_caps_cache.get_mam_namespace(jid)
if namespace is None:
# Query to our own archive
namespace = self.archiving_namespace
iq = nbxmpp.Iq('set', to=jid)
query = iq.addChild('query', namespace=namespace)
form = query.addChild(node=nbxmpp.DataForm(typ='submit'))
field = nbxmpp.DataField(typ='hidden',
name='FORM_TYPE',
value=namespace)
form.addChild(node=field)
if start:
field = nbxmpp.DataField(typ='text-single',
name='start',
value=start.strftime('%Y-%m-%dT%H:%M:%SZ'))
form.addChild(node=field)
if end:
field = nbxmpp.DataField(typ='text-single',
name='end',
value=end.strftime('%Y-%m-%dT%H:%M:%SZ'))
form.addChild(node=field)
if with_:
field = nbxmpp.DataField(typ='jid-single', name='with', value=with_)
def respond_e2e_bob(self, form, negotiated, not_acceptable):
"""
4.3 esession response (bob)
"""
response = nbxmpp.Message()
feature = response.NT.feature
feature.setNamespace(nbxmpp.NS_FEATURE)
x = nbxmpp.DataForm(typ='submit')
x.addChild(node=nbxmpp.DataField(name='FORM_TYPE',
value='urn:xmpp:ssn'))
x.addChild(node=nbxmpp.DataField(name='accept', value='true'))
for name in negotiated:
# some fields are internal and should not be sent
if not name in ('send_pubkey', 'recv_pubkey'):
x.addChild(node=nbxmpp.DataField(name=name,
value=negotiated[name]))
self.negotiated = negotiated
# the offset of the group we chose (need it to match up with the dhhash)
group_order = 0
modp_f = form.getField('modp')
def respond_archiving(self, form):
field = form.getField('logging')
options = [x[1] for x in field.getOptions()]
values = field.getValues()
logging = self.archiving_logging_preference(options)
self.negotiated['logging'] = logging
response = nbxmpp.Message()
feature = response.NT.feature
feature.setNamespace(nbxmpp.NS_FEATURE)
x = nbxmpp.DataForm(typ='submit')
x.addChild(node=nbxmpp.DataField(name='FORM_TYPE',
value='urn:xmpp:ssn'))
x.addChild(node=nbxmpp.DataField(name='accept', value='true'))
x.addChild(node=nbxmpp.DataField(name='logging', value=logging))
self.status = 'responded-archiving'
feature.addChild(node=x)
if not logging:
response = nbxmpp.Error(response, nbxmpp.ERR_NOT_ACCEPTABLE)
feature = nbxmpp.Node(nbxmpp.NS_FEATURE + ' feature')
def negotiate_e2e(self, sigmai):
self.negotiated = {}
request = nbxmpp.Message()
feature = request.NT.feature
feature.setNamespace(nbxmpp.NS_FEATURE)
x = nbxmpp.DataForm(typ='form')
x.addChild(node=nbxmpp.DataField(name='FORM_TYPE', value='urn:xmpp:ssn',
typ='hidden'))
x.addChild(node=nbxmpp.DataField(name='accept', value='1',
typ='boolean', required=True))
# this field is incorrectly called 'otr' in XEPs 0116 and 0217
x.addChild(node=nbxmpp.DataField(name='logging', typ='list-single',
options=self.logging_preference(), required=True))
# unsupported options: 'disabled', 'enabled'
x.addChild(node=nbxmpp.DataField(name='disclosure', typ='list-single',
options=['never'], required=True))
x.addChild(node=nbxmpp.DataField(name='security', typ='list-single',
options=['e2e'], required=True))
x.addChild(node=nbxmpp.DataField(name='crypt_algs', value='aes128-ctr',
def negotiate_archiving(self):
self.negotiated = {}
request = nbxmpp.Message()
feature = request.NT.feature
feature.setNamespace(nbxmpp.NS_FEATURE)
x = nbxmpp.DataForm(typ='form')
x.addChild(node=nbxmpp.DataField(name='FORM_TYPE', value='urn:xmpp:ssn',
typ='hidden'))
x.addChild(node=nbxmpp.DataField(name='accept', value='1',
typ='boolean', required=True))
x.addChild(node=nbxmpp.DataField(name='logging', typ='list-single',
options=self.archiving_logging_preference(), required=True))
x.addChild(node=nbxmpp.DataField(name='disclosure', typ='list-single',
options=['never'], required=True))
x.addChild(node=nbxmpp.DataField(name='security', typ='list-single',
options=['none'], required=True))
feature.addChild(node=x)
"""
'Alice Accepts', continued
"""
self.encryptable_stanzas = ['message']
self.sas_algs = 'sas28x5'
self.cipher = AES
self.hash_alg = sha256
self.compression = None
self.negotiated = negotiated
accept = nbxmpp.Message()
feature = accept.NT.feature
feature.setNamespace(nbxmpp.NS_FEATURE)
result = nbxmpp.DataForm(typ='result')
self.c_s = crypto.decode_mpi(base64.b64decode(form['counter']))
self.c_o = self.c_s ^ (2 ** (self.n - 1))
self.n_o = base64.b64decode(form['my_nonce'])
mod_p = int(form['modp'])
p = dh.primes[mod_p]
x = self.xes[mod_p]
e = self.es[mod_p]
self.d = crypto.decode_mpi(base64.b64decode(form['dhkeys']))
self.k = self.get_shared_secret(self.d, x, p)
result.addChild(node=nbxmpp.DataField(name='FORM_TYPE',
value='urn:xmpp:ssn'))
result.addChild(node=nbxmpp.DataField(name='accept', value='1'))
def archiving_accepted(self, form):
negotiated = {}
ask_user = {}
not_acceptable = []
if form['logging'] not in self.archiving_logging_preference():
raise
self.negotiated['logging'] = form['logging']
accept = nbxmpp.Message()
feature = accept.NT.feature
feature.setNamespace(nbxmpp.NS_FEATURE)
result = nbxmpp.DataForm(typ='result')
result.addChild(node=nbxmpp.DataField(name='FORM_TYPE',
value='urn:xmpp:ssn'))
result.addChild(node=nbxmpp.DataField(name='accept', value='1'))
feature.addChild(node=result)
self.send(accept)
if self.negotiated['logging'] == 'mustnot':
self.loggable = False
log.debug('archiving session accepted: %s' % self.loggable)
self.status = 'active'
self.archiving = True
if self.control:
self.control.print_archiving_session_details()
def terminate(self, send_termination = True):
# only send termination message if we've sent a message and think they
# have XEP-0201 support
if send_termination and self.last_send > 0 and \
(self.received_thread_id or self.last_receive == 0):
msg = nbxmpp.Message()
feature = msg.NT.feature
feature.setNamespace(nbxmpp.NS_FEATURE)
x = nbxmpp.DataForm(typ='submit')
x.addChild(node=nbxmpp.DataField(name='FORM_TYPE',
value='urn:xmpp:ssn'))
x.addChild(node=nbxmpp.DataField(name='terminate', value='1'))
feature.addChild(node=x)
self.send(msg)
self.status = None