Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import asyncio
import asynctest
import asynctest.mock as amock
from aiohttp import ClientOSError
from opsdroid.cli.start import configure_lang
from opsdroid.core import OpsDroid
from opsdroid.matchers import match_witai
from opsdroid.events import Message
from opsdroid.parsers import witai
from opsdroid.connector import Connector
class TestParserWitai(asynctest.TestCase):
"""Test the opsdroid wit.ai parser."""
async def setup(self):
configure_lang({})
async def getMockSkill(self):
async def mockedskill(opsdroid, config, message):
pass
mockedskill.config = {}
return mockedskill
async def getRaisingMockSkill(self):
async def mockedskill(opsdroid, config, message):
raise Exception()
import asynctest
try:
import aiohttp # noqa: F401
except ImportError:
aiohttp = None
else:
from keycloak.aio.client import KeycloakClient
from keycloak.aio.openid_connect import KeycloakOpenidConnect
from keycloak.aio.realm import KeycloakRealm
from keycloak.aio.well_known import KeycloakWellKnown
@asynctest.skipIf(aiohttp is None, 'aiohttp is not installed')
class KeycloakOpenidConnectTestCase(asynctest.TestCase):
async def setUp(self):
self.realm = asynctest.MagicMock(spec_set=KeycloakRealm)
self.realm.client = asynctest.MagicMock(spec_set=KeycloakClient)
self.realm.client.get = asynctest.CoroutineMock()
self.realm.client.post = asynctest.CoroutineMock()
self.realm.client.put = asynctest.CoroutineMock()
self.realm.client.delete = asynctest.CoroutineMock()
self.client_id = 'client-id'
self.client_secret = 'client-secret'
self.openid_client = await KeycloakOpenidConnect(
realm=self.realm,
client_id=self.client_id,
client_secret=self.client_secret
)
import functools
import redis
try:
import ujson as json
except ImportError:
import json
import pydatacoll.utils.logger as my_logger
from pydatacoll.resources.protocol import *
from test.mock_device import iec104device, mock_data
from pydatacoll import api_server
logger = my_logger.get_logger('TestStress')
class StressingTest(asynctest.TestCase):
loop = None # make pycharm happy
def setUp(self):
self.redis_client = redis.StrictRedis(db=1, decode_responses=True)
mock_data.generate()
self.server_list = list()
for device in mock_data.device_list:
if 'port' not in device:
continue
self.server_list.append(
self.loop.run_until_complete(
self.loop.create_server(iec104device.IEC104Device, '127.0.0.1', device['port'])))
self.api_server = api_server.APIServer(8080, self.loop)
def tearDown(self):
self.api_server.stop_server()
@asynctest.fail_on(unused_loop=False)
class LoggingTestCase(asynctest.TestCase):
def __init__(self, calls):
super().__init__()
self.calls = calls
def setUp(self):
self.calls.append('setUp')
def test_basic(self):
self.events.append('test_basic')
def tearDown(self):
self.events.append('tearDown')
class StartWaitProcessTestCase(asynctest.TestCase):
@staticmethod
@asyncio.coroutine
def start_wait_process(loop):
process = yield from asyncio.create_subprocess_shell(
"true", stdout=subprocess.PIPE, stderr=subprocess.PIPE,
loop=loop)
try:
out, err = yield from asyncio.wait_for(
process.communicate(), timeout=.1, loop=loop)
except BaseException:
process.kill()
os.waitpid(process.pid, os.WNOHANG)
raise
@asyncio.coroutine
import aiowiki
import os
from random import randint
AIOWIKI_TEST_URL = os.environ.get("AIOWIKI_TEST_URL")
AIOWIKI_TEST_USERNAME = os.environ.get("AIOWIKI_TEST_USERNAME")
AIOWIKI_TEST_PASSWORD = os.environ.get("AIOWIKI_TEST_PASSWORD")
if not AIOWIKI_TEST_URL or not AIOWIKI_TEST_USERNAME or not AIOWIKI_TEST_PASSWORD:
small_test = True
else:
small_test = False
@asynctest.lenient
class Test_Aiowiki(asynctest.TestCase):
async def test_working_by_url(self):
wiki = aiowiki.Wiki("https://en.wikipedia.org/w/api.php")
self.assertEqual(wiki.http.url, "https://en.wikipedia.org/w/api.php")
page = wiki.get_page("Python (programming language)")
await page.summary() # should have a page, so it actually fetches there
await wiki.close()
async def test_working_by_wikipedia(self):
wiki = aiowiki.Wiki.wikipedia("en")
self.assertEqual(wiki.http.url, "https://en.wikipedia.org/w/api.php")
self.assertEqual(
repr(wiki), ""
)
page = wiki.get_page("Python (programming language)")
await page.summary() # should have a page, so it actually fetches there
await wiki.close()
import asynctest
from eventsource.services.commandbus import Command, command_handler, Bus, QuerySpec, query_processor, BusResolverError, \
AggregateCommand, CommandHandler, QueryProcessor
class BusTests(asynctest.TestCase):
async def test_command_handler_should_called(self):
was_called = False
class TestCommand(Command): ...
@command_handler(TestCommand)
async def test_command_function(command: TestCommand, **kwargs):
nonlocal was_called
self.assertIsNotNone(command)
was_called = True
bus = Bus()
await bus.execute_command(TestCommand())
self.assertTrue(was_called)
async def test_command_should_pass_argument_from_bus_constructor(self):
AMQP_URL = URL(os.getenv("AMQP_URL", "amqp://guest:guest@localhost"))
if not AMQP_URL.path:
AMQP_URL.path = '/'
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass
class AsyncTestCase(asynctest.TestCase):
use_default_loop = False
forbid_get_event_loop = False
TEST_TIMEOUT = int(os.getenv('ASYNCIO_TEST_TIMEOUT', '30'))
def _run_test_method(self, method):
result = method()
if asyncio.iscoroutine(result):
self.loop.run_until_complete(
asyncio.wait_for(result, timeout=self.TEST_TIMEOUT)
)
@property
def _all_tasks(self):
return getattr(asyncio, 'all_tasks', asyncio.Task.all_tasks)
import asynctest
import unittest
import unittest.mock
from tattle import config
from tattle import state
from tattle import messages
# noinspection PyTypeChecker
class NodeManagerTestCase(asynctest.TestCase):
async def setUp(self):
super(NodeManagerTestCase, self).setUp()
self.queue = unittest.mock.Mock()
self.events = unittest.mock.Mock()
self.nodes = state.NodeManager(config.Configuration(), self.queue, self.events, loop=self.loop)
# create a local node
await self.nodes.set_local_node('local-node', '127.0.0.1', 7800)
# create other nodes
for n in range(3):
await self.nodes.on_node_alive('node-{}'.format(n + 1), 1, '127.0.0.1', 7800 + n + 1)
self.queue.reset_mock()
self.events.reset_mock()
import asynctest
from asynctest.mock import call
from asynctest.mock import patch
from asynctest.mock import MagicMock
class TestRobot(asynctest.TestCase):
def setUp(self):
patcher1 = patch('charlesbot.robot.Robot.initialize_robot')
self.addCleanup(patcher1.stop)
self.mock_initialize_robot = patcher1.start()
from charlesbot.robot import Robot
self.robot = Robot()
@asynctest.ignore_loop
def test_exit_cleanly_no_plugins(self):
self.robot.plugin_list = []
self.robot.exit_cleanly()
self.assertEqual(self.robot.is_running(), False)
@asynctest.ignore_loop
import asynctest
import datetime
from pyplanet.core import Controller
class TestPermissions(asynctest.TestCase):
async def test_registration(self):
instance = Controller.prepare(name='default').instance
await instance.db.connect()
await instance.apps.discover()
from pyplanet.apps.core.maniaplanet.models import Player
await instance.permission_manager.register(
'test1',
namespace='tst1',
min_level=Player.LEVEL_PLAYER
)
assert bool(await instance.permission_manager.get_perm('tst1', 'test1'))
async def test_checking(self):
instance = Controller.prepare(name='default').instance
await instance.db.connect()