Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import unittest
from slixmpp.test import SlixTest
from slixmpp.stanza.message import Message
from slixmpp.stanza.htmlim import HTMLIM
from slixmpp.xmlstream import register_stanza_plugin
class TestMessageStanzas(SlixTest):
def setUp(self):
register_stanza_plugin(Message, HTMLIM)
def testGroupchatReplyRegression(self):
"Regression groupchat reply should be to barejid"
msg = self.Message()
msg['to'] = 'me@myserver.tld'
msg['from'] = 'room@someservice.someserver.tld/somenick'
msg['type'] = 'groupchat'
msg['body'] = "this is a message"
msg = msg.reply()
self.assertTrue(str(msg['to']) == 'room@someservice.someserver.tld')
def testHTMLPlugin(self):
"Test message/html/body stanza"
import unittest
from slixmpp.test import SlixTest
from slixmpp.util import (
MemoryCache, MemoryPerJidCache,
FileSystemCache, FileSystemPerJidCache
)
from tempfile import TemporaryDirectory
class TestCacheClass(SlixTest):
def testMemoryCache(self):
cache = MemoryCache()
cache.store("test", "test_value")
self.assertEqual(cache.retrieve("test"), "test_value")
self.assertEqual(cache.retrieve("test2"), None)
cache.remove("test")
self.assertEqual(cache.retrieve("test"), None)
def testMemoryPerJidcache(self):
cache = MemoryPerJidCache()
cache.store_by_jid("test@example.com", "test", "test_value")
self.assertEqual(
import time
import threading
import unittest
from slixmpp.test import SlixTest
class TestStreamDisco(SlixTest):
"""
Test using the XEP-0030 plugin.
"""
def tearDown(self):
self.stream_close()
def testInfoEmptyDefaultNode(self):
"""
Info query result from an entity MUST have at least one identity
and feature, namely http://jabber.org/protocol/disco#info.
Since the XEP-0030 plugin is loaded, a disco response should
be generated and not an error result.
"""
import threading
import unittest
from slixmpp.test import SlixTest
class TestOOB(SlixTest):
def tearDown(self):
self.stream_close()
def testSendOOB(self):
"""Test sending an OOB transfer request."""
self.stream_start(plugins=['xep_0066', 'xep_0030'])
url = 'http://github.com/fritzy/Slixmpp/blob/master/README'
t = threading.Thread(
name='send_oob',
target=self.xmpp['xep_0066'].send_oob,
args=('user@example.com', url),
kwargs={'desc': 'Slixmpp README'})
import unittest
from slixmpp import Message
from slixmpp.test import SlixTest
import slixmpp.plugins.xep_0033 as xep_0033
from slixmpp.xmlstream import register_stanza_plugin
class TestAddresses(SlixTest):
def setUp(self):
register_stanza_plugin(Message, xep_0033.Addresses)
def testAddAddress(self):
"""Testing adding extended stanza address."""
msg = self.Message()
msg['addresses'].add_address(atype='to', jid='to@header1.org')
self.check(msg, """
<address type="to">
""")
</address>
import unittest
from slixmpp import Message
from slixmpp.test import SlixTest
import slixmpp.plugins.xep_0184 as xep_0184
from slixmpp.xmlstream import register_stanza_plugin
class TestReciept(SlixTest):
def setUp(self):
register_stanza_plugin(Message, xep_0184.Request)
register_stanza_plugin(Message, xep_0184.Received)
def testCreateRequest(self):
request = """
"""
msg = self.Message()
self.assertEqual(msg['request_receipt'], False)
import unittest
from slixmpp.test import SlixTest
import slixmpp.plugins.xep_0004 as xep_0004
import slixmpp.plugins.xep_0060.stanza as pubsub
from slixmpp.xmlstream.stanzabase import ET
class TestPubsubStanzas(SlixTest):
def testAffiliations(self):
"Testing iq/pubsub/affiliations/affiliation stanzas"
iq = self.Iq()
aff1 = pubsub.Affiliation()
aff1['node'] = 'testnode'
aff1['affiliation'] = 'owner'
aff2 = pubsub.Affiliation()
aff2['node'] = 'testnode2'
aff2['affiliation'] = 'publisher'
iq['pubsub']['affiliations'].append(aff1)
iq['pubsub']['affiliations'].append(aff2)
self.check(iq, """
import time
import unittest
from slixmpp.test import SlixTest
class TestEvents(SlixTest):
def setUp(self):
self.stream_start()
def tearDown(self):
self.stream_close()
def testEventHappening(self):
"""Test handler working"""
happened = []
def handletestevent(event):
happened.append(True)
self.xmpp.add_event_handler("test_event", handletestevent)
self.xmpp.event("test_event")
from slixmpp.xmlstream.matcher import MatchXPath
from slixmpp.xmlstream.handler import Callback
from slixmpp.exceptions import XMPPError
import unittest
from slixmpp.test import SlixTest
class TestStreamExceptions(SlixTest):
"""
Test handling roster updates.
"""
def tearDown(self):
self.stream_close()
def testExceptionReply(self):
"""Test that raising an exception replies with the original stanza."""
def message(msg):
msg.reply()
msg['body'] = 'Body changed'
raise XMPPError(clear=False)
self.stream_start()
import time
import threading
import unittest
from slixmpp.test import SlixTest
from slixmpp.exceptions import IqTimeout
from slixmpp import Callback, MatchXPath
class TestHandlers(SlixTest):
"""
Test using handlers and waiters.
"""
def setUp(self):
self.stream_start()
def tearDown(self):
self.stream_close()
def testCallback(self):
"""Test using stream callback handlers."""
def callback_handler(stanza):
self.xmpp.send_raw("""