Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _recv_data(self, stanza):
new_seq = stanza['ibb_data']['seq']
if new_seq != (self.recv_seq + 1) % 65535:
self.close()
raise XMPPError('unexpected-request')
self.recv_seq = new_seq
data = stanza['ibb_data']['data']
if len(data) > self.block_size:
self.close()
raise XMPPError('not-acceptable')
self.recv_queue.put_nowait(data)
self.xmpp.event('ibb_stream_data', self)
if isinstance(stanza, Iq):
stanza.reply().send()
self.extension_args = extension_args
def format(self):
"""
Format the error in a simple user-readable string.
"""
text = [self.etype, self.condition]
if self.text:
text.append(self.text)
if self.extension:
text.append(self.extension)
# TODO: handle self.extension_args
return ': '.join(text)
class IqTimeout(XMPPError):
"""
An exception which indicates that an IQ request response has not been
received within the alloted time window.
"""
def __init__(self, iq):
super().__init__(
condition='remote-server-timeout',
etype='cancel')
#: The :class:`~slixmpp.stanza.iq.Iq` stanza whose response
#: did not arrive before the timeout expired.
self.iq = iq
def _handle_request(self, iq):
profile = iq['si']['profile']
sid = iq['si']['id']
if not sid:
raise XMPPError(etype='modify', condition='bad-request')
if profile not in self._profiles:
raise XMPPError(
etype='modify',
condition='bad-request',
extension='bad-profile',
extension_ns=SI.namespace)
neg = iq['si']['feature_neg']['form'].get_fields()
options = neg['stream-method']['options'] or []
methods = []
for opt in options:
methods.append(opt['value'])
for method in methods:
if method in self._methods:
supported = True
break
else:
raise XMPPError('bad-request',
etype='modify',
condition='bad-request',
extension='bad-profile',
extension_ns=SI.namespace)
neg = iq['si']['feature_neg']['form'].get_fields()
options = neg['stream-method']['options'] or []
methods = []
for opt in options:
methods.append(opt['value'])
for method in methods:
if method in self._methods:
supported = True
break
else:
raise XMPPError('bad-request',
extension='no-valid-streams',
extension_ns=SI.namespace)
selected_method = None
log.debug('Available: %s', methods)
for order, method, plugin in self._methods_order:
log.debug('Testing: %s', method)
if method in methods:
selected_method = method
break
receiver = iq['to']
sender = iq['from']
self.api['add_pending'](receiver, sid, sender, {
'response_id': iq['id'],
async def _start(self, event):
try:
vcard = await self.xmpp['xep_0054'].get_vcard(self.xmpp.boundjid.bare)
data = vcard['vcard_temp']['PHOTO']['BINVAL']
if not data:
new_hash = ''
else:
new_hash = hashlib.sha1(data).hexdigest()
self.api['set_hash'](self.xmpp.boundjid, args=new_hash)
except XMPPError:
log.debug('Could not retrieve vCard for %s', self.xmpp.boundjid.bare)
def get_data(self):
text = self.xml.text
if not text:
raise XMPPError('not-acceptable', 'IBB data element is empty.')
b64_data = text.strip()
if VALID_B64.match(b64_data).group() == b64_data:
return from_b64(b64_data)
else:
raise XMPPError('not-acceptable')
print('Could not find file: %s' % self.filepath)
return self.disconnect()
avatar = avatar_file.read()
avatar_type = 'image/%s' % imghdr.what('', avatar)
avatar_id = self['xep_0084'].generate_id(avatar)
avatar_bytes = len(avatar)
avatar_file.close()
used_xep84 = False
print('Publish XEP-0084 avatar data')
result = await self['xep_0084'].publish_avatar(avatar)
if isinstance(result, XMPPError):
print('Could not publish XEP-0084 avatar')
else:
used_xep84 = True
print('Update vCard with avatar')
result = await self['xep_0153'].set_avatar(avatar=avatar, mtype=avatar_type)
if isinstance(result, XMPPError):
print('Could not set vCard avatar')
if used_xep84:
print('Advertise XEP-0084 avatar metadata')
result = await self['xep_0084'].publish_avatar_metadata([
{'id': avatar_id,
'type': avatar_type,
'bytes': avatar_bytes}
# We could advertise multiple avatars to provide
avatar_bytes = len(avatar)
avatar_file.close()
used_xep84 = False
print('Publish XEP-0084 avatar data')
result = await self['xep_0084'].publish_avatar(avatar)
if isinstance(result, XMPPError):
print('Could not publish XEP-0084 avatar')
else:
used_xep84 = True
print('Update vCard with avatar')
result = await self['xep_0153'].set_avatar(avatar=avatar, mtype=avatar_type)
if isinstance(result, XMPPError):
print('Could not set vCard avatar')
if used_xep84:
print('Advertise XEP-0084 avatar metadata')
result = await self['xep_0084'].publish_avatar_metadata([
{'id': avatar_id,
'type': avatar_type,
'bytes': avatar_bytes}
# We could advertise multiple avatars to provide
# options in image type, source (HTTP vs pubsub),
# size, etc.
# {'id': ....}
])
if isinstance(result, XMPPError):
print('Could not publish XEP-0084 metadata')
async def on_avatar(self, msg):
print("Received avatar update from %s" % msg['from'])
metadata = msg['pubsub_event']['items']['item']['avatar_metadata']
for info in metadata['items']:
if not info['url']:
try:
result = await self['xep_0084'].retrieve_avatar(msg['from'].bare, info['id'],
timeout=5)
except XMPPError:
print("Error retrieving avatar for %s" % msg['from'])
return
avatar = result['pubsub']['items']['item']['avatar_data']
filetype = FILE_TYPES.get(metadata['type'], 'png')
filename = 'avatar_%s_%s.%s' % (msg['from'].bare, info['id'], filetype)
with open(filename, 'wb+') as img:
img.write(avatar['value'])
else:
# We could retrieve the avatar via HTTP, etc here instead.
pass
if session:
handler = session['next']
interfaces = session['interfaces']
results = []
for stanza in iq['command']['substanzas']:
if stanza.plugin_attrib in interfaces:
results.append(stanza)
if len(results) == 1:
results = results[0]
session = handler(results, session)
self._process_command_response(iq, session)
else:
raise XMPPError('item-not-found')