Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
name = 'items'
plugin_attrib = name
interfaces = {'node', 'max_items'}
def set_max_items(self, value):
self._set_attr('max_items', str(value))
class Create(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub'
name = 'create'
plugin_attrib = name
interfaces = {'node'}
class Default(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub'
name = 'default'
plugin_attrib = name
interfaces = {'node', 'type'}
def get_type(self):
t = self._get_attr('type')
if not t:
return 'leaf'
return t
class Publish(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub'
name = 'publish'
plugin_attrib = name
self.xml.text = value
return self
def del_text(self):
"""Remove the contents inside the XML tag."""
self.xml.text = ""
return self
class Rejected(ElementBase):
namespace = 'urn:xmpp:iot:sensordata'
name = 'rejected'
plugin_attrib = name
interfaces = {'seqnr','error'}
sub_interfaces = {'error'}
class Fields(ElementBase):
""" Fields element, top level in a response message with data """
namespace = 'urn:xmpp:iot:sensordata'
name = 'fields'
plugin_attrib = name
interfaces = {'seqnr','done','nodes'}
def __init__(self, xml=None, parent=None):
ElementBase.__init__(self, xml, parent)
self._nodes = set()
def setup(self, xml=None):
"""
Populate the stanza object using an optional XML object.
Overrides ElementBase.setup
from slixmpp.plugins.xep_0009.stanza.RPC import RPCQuery, MethodCall, MethodResponse
from slixmpp.plugins.xep_0009.binding import py2xml, xml2py
from slixmpp.plugins.xep_0060.stanza.pubsub_event import Event, EventItems, EventItem
from slixmpp.xmlstream import ElementBase, ET, register_stanza_plugin
# JID,
from slixmpp import Iq
from packaging import version
from fah.messagereader import MessageReader
from fah.saslhandler import SaslHandler
from fah.settings import SettingsFah
log = logging.getLogger(__name__)
class ItemUpdate(ElementBase):
namespace = 'http://abb.com/protocol/update'
name = 'update'
plugin_attrib = name
interfaces = set('data')
class ItemUpdateEncrypted(ElementBase):
namespace = 'http://abb.com/protocol/update_encrypted'
name = 'update'
plugin_attrib = name
interfaces = set('data')
def data2py(update):
namespace = 'http://abb.com/protocol/update'
vals = []
class Title(ElementBase):
name = 'TITLE'
namespace = 'vcard-temp'
plugin_attrib = name
plugin_multi_attrib = 'titles'
interfaces = {name}
is_extension = True
def set_title(self, value):
self.xml.text = value
def get_title(self):
return self.xml.text
class Role(ElementBase):
name = 'ROLE'
namespace = 'vcard-temp'
plugin_attrib = name
plugin_multi_attrib = 'roles'
interfaces = {name}
is_extension = True
def set_role(self, value):
self.xml.text = value
def get_role(self):
return self.xml.text
class Note(ElementBase):
name = 'NOTE'
"""
self.del_items()
for item in items:
jid, node, name = item
self.add_item(jid, node, name)
def del_items(self):
"""Remove all items."""
self._items = set()
items = [i for i in self.iterables if isinstance(i, DiscoItem)]
for item in items:
self.xml.remove(item.xml)
self.iterables.remove(item)
class DiscoItem(ElementBase):
name = 'item'
namespace = 'http://jabber.org/protocol/disco#items'
plugin_attrib = name
interfaces = set(('jid', 'node', 'name'))
def get_node(self):
"""Return the item's node name or ``None``."""
return self._get_attr('node', None)
def get_name(self):
"""Return the item's human readable name, or ``None``."""
return self._get_attr('name', None)
register_stanza_plugin(DiscoItems, DiscoItem, iterable=True)
def setup(self, xml=None):
"""
Populate the stanza object using an optional XML object.
Overrides ElementBase.setup.
Sets a default error type and condition, and changes the
parent stanza's type to 'error'.
Arguments:
xml -- Use an existing XML object for the stanza's values.
"""
if ElementBase.setup(self, xml):
#If we had to generate XML then set default values.
self['type'] = 'cancel'
self['condition'] = 'feature-not-implemented'
if self.parent is not None:
self.parent()['type'] = 'error'
class MailSender(ElementBase):
namespace = 'google:mail:notify'
name = 'sender'
plugin_attrib = name
interfaces = set(['address', 'name', 'originator', 'unread'])
def get_originator(self):
return self.xml.attrib.get('originator', '0') == '1'
def get_unread(self):
return self.xml.attrib.get('unread', '0') == '1'
class NewMail(ElementBase):
namespace = 'google:mail:notify'
name = 'new-mail'
plugin_attrib = 'gmail_notification'
register_stanza_plugin(MailBox, MailThread, iterable=True)
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
This file is part of Slixmpp.
See the file LICENSE for copying permission.
"""
from slixmpp.stanza import Message, Presence, Iq
from slixmpp.xmlstream import ElementBase
class Forwarded(ElementBase):
name = 'forwarded'
namespace = 'urn:xmpp:forward:0'
plugin_attrib = 'forwarded'
interfaces = {'stanza'}
def get_stanza(self):
for stanza in self:
if isinstance(stanza, (Message, Presence, Iq)):
return stanza
return ''
def set_stanza(self, value):
self.del_stanza()
self.append(value)
def del_stanza(self):
from slixmpp.xmlstream import ET, ElementBase, register_stanza_plugin
class Privacy(ElementBase):
name = 'query'
namespace = 'jabber:iq:privacy'
plugin_attrib = 'privacy'
interfaces = set()
def add_list(self, name):
priv_list = List()
priv_list['name'] = name
self.append(priv_list)
return priv_list
class Active(ElementBase):
name = 'active'
namespace = 'jabber:iq:privacy'
plugin_attrib = name
class Affiliation(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub'
name = 'affiliation'
plugin_attrib = name
interfaces = {'node', 'affiliation', 'jid'}
def set_jid(self, value):
self._set_attr('jid', str(value))
def get_jid(self):
return JID(self._get_attr('jid'))
class Subscription(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub'
name = 'subscription'
plugin_attrib = name
interfaces = {'jid', 'node', 'subscription', 'subid'}
def set_jid(self, value):
self._set_attr('jid', str(value))
def get_jid(self):
return JID(self._get_attr('jid'))
class Subscriptions(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub'
name = 'subscriptions'
plugin_attrib = name