How to use the asynctest.mock.CoroutineMock function in asynctest

To help you get started, we’ve selected a few asynctest examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github opsdroid / opsdroid / tests / test_core.py View on Github external
async def test_parse_dialogflow(self):
        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "path/test.json"
        with OpsDroid() as opsdroid:
            opsdroid.config["parsers"] = {
                "dialogflow": {"project-id": "test", "enabled": True}
            }
            dialogflow_action = "smalltalk.greetings.whatsup"
            skill = amock.CoroutineMock()
            mock_connector = Connector({}, opsdroid=opsdroid)
            match_dialogflow_action(dialogflow_action)(skill)
            message = Message(
                text="Hello world",
                user="user",
                target="default",
                connector=mock_connector,
            )
            with amock.patch(
                "opsdroid.parsers.dialogflow.parse_dialogflow"
            ), amock.patch("opsdroid.parsers.dialogflow.call_dialogflow"):
                tasks = await opsdroid.parse(message)
                self.assertEqual(len(tasks), 2)

                tasks = await opsdroid.parse(message)
                self.assertLogs("_LOGGER", "warning")
github opsdroid / opsdroid / tests / test_connector_webexteams.py View on Github external
async def test_clean_up_webhooks(self):
        connector = ConnectorWebexTeams({"token": "abc123"})
        connector.api = amock.CoroutineMock()
        x = amock.CoroutineMock()
        x.id = amock.CoroutineMock()
        connector.api.webhooks.list = amock.Mock()
        connector.api.webhooks.list.return_value = [x, x]
        connector.api.webhooks.delete = amock.Mock()
        await connector.clean_up_webhooks()
        self.assertTrue(connector.api.webhooks.list.called)
        self.assertTrue(connector.api.webhooks.delete.called)
github opsdroid / opsdroid / tests / test_parser_watson.py View on Github external
async def test_parse_watson(self):
        with OpsDroid() as opsdroid:
            opsdroid.config["parsers"] = [
                {
                    "name": "watson",
                    "token": "test",
                    "gateway": "gateway",
                    "min-score": 0.3,
                    "assistant-id": "test",
                    "session-id": "12ndior2kld",
                }
            ]
            mock_skill = await self.getMockSkill()
            opsdroid.skills.append(match_watson("hello")(mock_skill))

            mock_connector = amock.CoroutineMock()
            message = Message("hi", "user", "default", mock_connector)

            with amock.patch.object(watson, "call_watson") as mocked_call_watson:
                mocked_call_watson.return_value = {
                    "output": {
                        "generic": [
                            {"response_type": "text", "text": "Hey hows it going?"}
                        ],
                        "intents": [{"intent": "hello", "confidence": 1}],
                        "entities": [
                            {
                                "entity": "greetings",
                                "location": [0, 2],
                                "value": "hello",
                                "confidence": 1,
                            },
github hsahovic / poke-env / unit_tests / player / test_player_network_interface.py View on Github external
async def test_log_in(post_mock):
    player = PlayerNetworkChild(
        player_configuration=player_configuration,
        avatar=12,
        server_configuration=server_configuration,
        log_level=38,
        start_listening=False,
    )

    player._send_message = CoroutineMock()
    player._change_avatar = CoroutineMock()

    await player._log_in(["A", "B", "C", "D"])

    player._change_avatar.assert_called_once_with(12)
    player._send_message.assert_called_once_with("/trn username,0,content")
    post_mock.assert_called_once()
github opsdroid / opsdroid / tests / test_connector_facebook.py View on Github external
async def test_facebook_message_handler(self):
        """Test the new facebook message handler."""
        import aiohttp

        opsdroid = amock.CoroutineMock()
        connector = ConnectorFacebook({}, opsdroid=opsdroid)
        req_ob = {
            "object": "page",
            "entry": [
                {
                    "messaging": [
                        {"message": {"text": "Hello"}, "sender": {"id": "1234567890"}}
                    ]
                }
            ],
        }
        mock_request = amock.CoroutineMock()
        mock_request.json = amock.CoroutineMock()
        mock_request.json.return_value = req_ob

        with OpsDroid() as opsdroid:
github opsdroid / opsdroid / tests / test_connector_webexteams.py View on Github external
async def test_subscribe_to_rooms(self):
        connector = ConnectorWebexTeams(
            {"token": "abc123", "webhook-url": "http://127.0.0.1"}
        )
        connector.api = amock.CoroutineMock()
        connector.opsdroid = amock.CoroutineMock()
        connector.opsdroid.web_server.web_app.router.add_post = amock.CoroutineMock()
        connector.api.webhooks.create = amock.CoroutineMock()
        await connector.subscribe_to_rooms()
        self.assertTrue(connector.api.webhooks.create.called)
        self.assertTrue(connector.opsdroid.web_server.web_app.router.add_post.called)
github pgjones / hypercorn / tests / protocol / test_http_stream.py View on Github external
async def _stream() -> HTTPStream:
    stream = HTTPStream(Config(), False, None, None, CoroutineMock(), CoroutineMock(), 1)
    stream.app_put = CoroutineMock()
    stream.config._log = AsyncMock(spec=Logger)
    return stream
github opsdroid / opsdroid / tests / test_parser_luisai.py View on Github external
async def test_call_luisai(self):
        opsdroid = amock.CoroutineMock()
        mock_connector = Connector({}, opsdroid=opsdroid)
        message = Message(
            text="schedule meeting",
            user="user",
            target="default",
            connector=mock_connector,
        )
        config = {"name": "luisai", "appid": "test", "appkey": "test", "verbose": True}
        result = amock.Mock()
        result.json = amock.CoroutineMock()
        result.json.return_value = {
            "query": "schedule meeting",
            "topScoringIntent": {"intent": "Calendar.Add", "score": 0.900492251},
            "intents": [{"intent": "Calendar.Add", "score": 0.900492251}],
            "entities": [],
        }
github opsdroid / opsdroid / tests / test_database_redis.py View on Github external
async def test_put(self):
        db = RedisDatabase({})
        db.client = MockRedisClient()
        db.client.execute = amock.CoroutineMock(return_value='{"key":"value"}')

        result = await db.put("string", dict(key="value"))
github opsdroid / opsdroid / tests / test_connector_webexteams.py View on Github external
async def test_clean_up_webhooks(self):
        connector = ConnectorWebexTeams({"token": "abc123"})
        connector.api = amock.CoroutineMock()
        x = amock.CoroutineMock()
        x.id = amock.CoroutineMock()
        connector.api.webhooks.list = amock.Mock()
        connector.api.webhooks.list.return_value = [x, x]
        connector.api.webhooks.delete = amock.Mock()
        await connector.clean_up_webhooks()
        self.assertTrue(connector.api.webhooks.list.called)
        self.assertTrue(connector.api.webhooks.delete.called)