Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def plugin_init(self):
self.xmpp.register_handler(
Callback(
'HTTP Request',
StanzaPath('iq/http-req'),
self._handle_request
)
)
self.xmpp.register_handler(
Callback(
'HTTP Response',
StanzaPath('iq/http-resp'),
self._handle_response
)
)
register_stanza_plugin(Iq, HTTPRequest, iterable=True)
register_stanza_plugin(Iq, HTTPResponse, iterable=True)
register_stanza_plugin(HTTPRequest, Headers, iterable=True)
register_stanza_plugin(HTTPRequest, HTTPData, iterable=True)
register_stanza_plugin(HTTPResponse, Headers, iterable=True)
register_stanza_plugin(HTTPResponse, HTTPData, iterable=True)
# TODO: Should we register any api's here? self.api.register()
def plugin_init(self):
""" Start the XEP-0323 plugin """
self.xmpp.register_handler(
Callback('Sensordata Event:Req',
StanzaPath('iq@type=get/req'),
self._handle_event_req))
self.xmpp.register_handler(
Callback('Sensordata Event:Accepted',
StanzaPath('iq@type=result/accepted'),
self._handle_event_accepted))
self.xmpp.register_handler(
Callback('Sensordata Event:Rejected',
StanzaPath('iq@type=error/rejected'),
self._handle_event_rejected))
self.xmpp.register_handler(
Callback('Sensordata Event:Cancel',
StanzaPath('iq@type=get/cancel'),
self._handle_event_cancel))
self.xmpp.register_handler(
Callback('Sensordata Event:Cancelled',
StanzaPath('iq@type=result/cancelled'),
self._handle_event_cancelled))
def plugin_init(self):
self.xmpp.register_handler(
Callback('Explicit Message Encryption',
StanzaPath('message/eme'),
self._handle_eme))
register_stanza_plugin(Message, Encryption)
def plugin_init(self):
"""Start the XEP-0066 plugin."""
self.url_handlers = {'global': self._default_handler,
'jid': {}}
register_stanza_plugin(Iq, stanza.OOBTransfer)
register_stanza_plugin(Message, stanza.OOB)
register_stanza_plugin(Presence, stanza.OOB)
self.xmpp.register_handler(
Callback('OOB Transfer',
StanzaPath('iq@type=set/oob_transfer'),
self._handle_transfer))
def plugin_init(self):
"""
Start the XEP-0092 plugin.
"""
if 'name' in self.config:
self.software_name = self.config['name']
self.xmpp.register_handler(
Callback('Software Version',
StanzaPath('iq@type=get/software_version'),
self._handle_version))
register_stanza_plugin(Iq, Version)
def plugin_init(self):
register_stanza_plugin(Iq, BlockList)
register_stanza_plugin(Iq, Block)
register_stanza_plugin(Iq, Unblock)
self.xmpp.register_handler(
Callback('Blocked Contact',
StanzaPath('iq@type=set/block'),
self._handle_blocked))
self.xmpp.register_handler(
Callback('Unblocked Contact',
StanzaPath('iq@type=set/unblock'),
self._handle_unblocked))
def plugin_init(self):
register_stanza_plugin(Iq, BlockList)
register_stanza_plugin(Iq, Block)
register_stanza_plugin(Iq, Unblock)
self.xmpp.register_handler(
Callback('Blocked Contact',
StanzaPath('iq@type=set/block'),
self._handle_blocked))
self.xmpp.register_handler(
Callback('Unblocked Contact',
StanzaPath('iq@type=set/unblock'),
self._handle_unblocked))
register_stanza_plugin(Message, BitsOfBinary)
register_stanza_plugin(Presence, BitsOfBinary)
self.xmpp.register_handler(
Callback('Bits of Binary - Iq',
StanzaPath('iq/bob'),
self._handle_bob_iq))
self.xmpp.register_handler(
Callback('Bits of Binary - Message',
StanzaPath('message/bob'),
self._handle_bob))
self.xmpp.register_handler(
Callback('Bits of Binary - Presence',
StanzaPath('presence/bob'),
self._handle_bob))
self.api.register(self._get_bob, 'get_bob', default=True)
self.api.register(self._set_bob, 'set_bob', default=True)
self.api.register(self._del_bob, 'del_bob', default=True)
StanzaPath('message/set'),
self._handle_direct_set))
self.xmpp.register_handler(
Callback('Control Event:SetReq',
StanzaPath('iq@type=set/set'),
self._handle_set_req))
self.xmpp.register_handler(
Callback('Control Event:SetResponse',
StanzaPath('iq@type=result/setResponse'),
self._handle_set_response))
self.xmpp.register_handler(
Callback('Control Event:SetResponseError',
StanzaPath('iq@type=error/setResponse'),
self._handle_set_response))
# Server side dicts
self.nodes = {}
self.sessions = {}
self.last_seqnr = 0
## For testning only
self.test_authenticated_from = ""
if not isinstance(nodes, (list, set)):
nodes = [nodes]
iq = self.xmpp.Iq()
iq['type'] = 'get'
iq['from'] = ifrom
offline = iq['offline']
for node in nodes:
item = stanza.Item()
item['node'] = node
item['action'] = 'view'
offline.append(item)
collector = Collector(
'Offline_Results_%s' % iq['id'],
StanzaPath('message/offline'))
self.xmpp.register_handler(collector)
def wrapped_cb(iq):
results = collector.stop()
if iq['type'] == 'result':
iq['offline']['results'] = results
callback(iq)
iq.send(timeout=timeout, callback=wrapped_cb,
timeout_callback=timeout_callback)