How to use the asynctest.TestCase function in asynctest

To help you get started, we’ve selected a few asynctest examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github opsdroid / opsdroid / tests / test_parser_witai.py View on Github external
import asyncio
import asynctest
import asynctest.mock as amock

from aiohttp import ClientOSError

from opsdroid.cli.start import configure_lang
from opsdroid.core import OpsDroid
from opsdroid.matchers import match_witai
from opsdroid.events import Message
from opsdroid.parsers import witai
from opsdroid.connector import Connector


class TestParserWitai(asynctest.TestCase):
    """Test the opsdroid wit.ai parser."""

    async def setup(self):
        configure_lang({})

    async def getMockSkill(self):
        async def mockedskill(opsdroid, config, message):
            pass

        mockedskill.config = {}
        return mockedskill

    async def getRaisingMockSkill(self):
        async def mockedskill(opsdroid, config, message):
            raise Exception()
github Peter-Slump / python-keycloak-client / tests / keycloak / aio / test_openid_connect.py View on Github external
import asynctest

try:
    import aiohttp  # noqa: F401
except ImportError:
    aiohttp = None
else:
    from keycloak.aio.client import KeycloakClient
    from keycloak.aio.openid_connect import KeycloakOpenidConnect
    from keycloak.aio.realm import KeycloakRealm
    from keycloak.aio.well_known import KeycloakWellKnown


@asynctest.skipIf(aiohttp is None, 'aiohttp is not installed')
class KeycloakOpenidConnectTestCase(asynctest.TestCase):
    async def setUp(self):
        self.realm = asynctest.MagicMock(spec_set=KeycloakRealm)
        self.realm.client = asynctest.MagicMock(spec_set=KeycloakClient)
        self.realm.client.get = asynctest.CoroutineMock()
        self.realm.client.post = asynctest.CoroutineMock()
        self.realm.client.put = asynctest.CoroutineMock()
        self.realm.client.delete = asynctest.CoroutineMock()

        self.client_id = 'client-id'
        self.client_secret = 'client-secret'

        self.openid_client = await KeycloakOpenidConnect(
            realm=self.realm,
            client_id=self.client_id,
            client_secret=self.client_secret
        )
github timercrack / pydatacoll / test / test_stress.py View on Github external
import functools
import redis
try:
    import ujson as json
except ImportError:
    import json

import pydatacoll.utils.logger as my_logger
from pydatacoll.resources.protocol import *
from test.mock_device import iec104device, mock_data
from pydatacoll import api_server

logger = my_logger.get_logger('TestStress')


class StressingTest(asynctest.TestCase):
    loop = None  # make pycharm happy

    def setUp(self):
        self.redis_client = redis.StrictRedis(db=1, decode_responses=True)
        mock_data.generate()
        self.server_list = list()
        for device in mock_data.device_list:
            if 'port' not in device:
                continue
            self.server_list.append(
                self.loop.run_until_complete(
                        self.loop.create_server(iec104device.IEC104Device, '127.0.0.1', device['port'])))
        self.api_server = api_server.APIServer(8080, self.loop)

    def tearDown(self):
        self.api_server.stop_server()
github Martiusweb / asynctest / test / test_case.py View on Github external
    @asynctest.fail_on(unused_loop=False)
    class LoggingTestCase(asynctest.TestCase):
        def __init__(self, calls):
            super().__init__()
            self.calls = calls

        def setUp(self):
            self.calls.append('setUp')

        def test_basic(self):
            self.events.append('test_basic')

        def tearDown(self):
            self.events.append('tearDown')

    class StartWaitProcessTestCase(asynctest.TestCase):
        @staticmethod
        @asyncio.coroutine
        def start_wait_process(loop):
            process = yield from asyncio.create_subprocess_shell(
                "true", stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                loop=loop)

            try:
                out, err = yield from asyncio.wait_for(
                    process.communicate(), timeout=.1, loop=loop)
            except BaseException:
                process.kill()
                os.waitpid(process.pid, os.WNOHANG)
                raise

        @asyncio.coroutine
github Gelbpunkt / aiowiki / tests / __init__.py View on Github external
import aiowiki
import os
from random import randint

AIOWIKI_TEST_URL = os.environ.get("AIOWIKI_TEST_URL")
AIOWIKI_TEST_USERNAME = os.environ.get("AIOWIKI_TEST_USERNAME")
AIOWIKI_TEST_PASSWORD = os.environ.get("AIOWIKI_TEST_PASSWORD")

if not AIOWIKI_TEST_URL or not AIOWIKI_TEST_USERNAME or not AIOWIKI_TEST_PASSWORD:
    small_test = True
else:
    small_test = False


@asynctest.lenient
class Test_Aiowiki(asynctest.TestCase):
    async def test_working_by_url(self):
        wiki = aiowiki.Wiki("https://en.wikipedia.org/w/api.php")
        self.assertEqual(wiki.http.url, "https://en.wikipedia.org/w/api.php")
        page = wiki.get_page("Python (programming language)")
        await page.summary()  # should have a page, so it actually fetches there
        await wiki.close()

    async def test_working_by_wikipedia(self):
        wiki = aiowiki.Wiki.wikipedia("en")
        self.assertEqual(wiki.http.url, "https://en.wikipedia.org/w/api.php")
        self.assertEqual(
            repr(wiki), ""
        )
        page = wiki.get_page("Python (programming language)")
        await page.summary()  # should have a page, so it actually fetches there
        await wiki.close()
github laskoviymishka / cqrs-eventsource / tests / bus_tests.py View on Github external
import asynctest

from eventsource.services.commandbus import Command, command_handler, Bus, QuerySpec, query_processor, BusResolverError, \
    AggregateCommand, CommandHandler, QueryProcessor


class BusTests(asynctest.TestCase):
    async def test_command_handler_should_called(self):
        was_called = False

        class TestCommand(Command): ...

        @command_handler(TestCommand)
        async def test_command_function(command: TestCommand, **kwargs):
            nonlocal was_called
            self.assertIsNotNone(command)
            was_called = True

        bus = Bus()
        await bus.execute_command(TestCommand())
        self.assertTrue(was_called)

    async def test_command_should_pass_argument_from_bus_constructor(self):
github mosquito / aio-pika / tests / __init__.py View on Github external
AMQP_URL = URL(os.getenv("AMQP_URL", "amqp://guest:guest@localhost"))

if not AMQP_URL.path:
    AMQP_URL.path = '/'


try:
    import uvloop
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
    pass


class AsyncTestCase(asynctest.TestCase):
    use_default_loop = False
    forbid_get_event_loop = False

    TEST_TIMEOUT = int(os.getenv('ASYNCIO_TEST_TIMEOUT', '30'))

    def _run_test_method(self, method):
        result = method()
        if asyncio.iscoroutine(result):
            self.loop.run_until_complete(
                asyncio.wait_for(result, timeout=self.TEST_TIMEOUT)
            )

    @property
    def _all_tasks(self):
        return getattr(asyncio, 'all_tasks', asyncio.Task.all_tasks)
github kippandrew / tattle / tests / test_state.py View on Github external
import asynctest
import unittest
import unittest.mock

from tattle import config
from tattle import state
from tattle import messages


# noinspection PyTypeChecker
class NodeManagerTestCase(asynctest.TestCase):
    async def setUp(self):
        super(NodeManagerTestCase, self).setUp()
        self.queue = unittest.mock.Mock()
        self.events = unittest.mock.Mock()
        self.nodes = state.NodeManager(config.Configuration(), self.queue, self.events, loop=self.loop)

        # create a local node
        await self.nodes.set_local_node('local-node', '127.0.0.1', 7800)

        # create other nodes
        for n in range(3):
            await self.nodes.on_node_alive('node-{}'.format(n + 1), 1, '127.0.0.1', 7800 + n + 1)

        self.queue.reset_mock()
        self.events.reset_mock()
github marvinpinto / charlesbot / tests / test_robot.py View on Github external
import asynctest
from asynctest.mock import call
from asynctest.mock import patch
from asynctest.mock import MagicMock


class TestRobot(asynctest.TestCase):

    def setUp(self):
        patcher1 = patch('charlesbot.robot.Robot.initialize_robot')
        self.addCleanup(patcher1.stop)
        self.mock_initialize_robot = patcher1.start()

        from charlesbot.robot import Robot
        self.robot = Robot()

    @asynctest.ignore_loop
    def test_exit_cleanly_no_plugins(self):
        self.robot.plugin_list = []
        self.robot.exit_cleanly()
        self.assertEqual(self.robot.is_running(), False)

    @asynctest.ignore_loop
github PyPlanet / PyPlanet / tests / unit / contrib / test_permissions.py View on Github external
import asynctest
import datetime

from pyplanet.core import Controller


class TestPermissions(asynctest.TestCase):
	async def test_registration(self):
		instance = Controller.prepare(name='default').instance
		await instance.db.connect()
		await instance.apps.discover()

		from pyplanet.apps.core.maniaplanet.models import Player
		await instance.permission_manager.register(
			'test1',
			namespace='tst1',
			min_level=Player.LEVEL_PLAYER
		)
		assert bool(await instance.permission_manager.get_perm('tst1', 'test1'))

	async def test_checking(self):
		instance = Controller.prepare(name='default').instance
		await instance.db.connect()