How to use the opsdroid.connector.slack.ConnectorSlack function in opsdroid

To help you get started, we’ve selected a few opsdroid examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github opsdroid / opsdroid / tests / test_connector_slack.py View on Github external
def test_init(self):
        """Test that the connector is initialised properly."""
        connector = ConnectorSlack({"token": "abc123"}, opsdroid=OpsDroid())
        self.assertEqual("#general", connector.default_target)
        self.assertEqual("slack", connector.name)
        self.assertEqual(10, connector.timeout)
github opsdroid / opsdroid / tests / test_connector_slack.py View on Github external
async def test_send_blocks(self):
        connector = ConnectorSlack({"token": "abc123"}, opsdroid=OpsDroid())
        connector.slack.api_call = amock.CoroutineMock()
        await connector.send(
            Blocks(
                [{"type": "section", "text": {"type": "mrkdwn", "text": "*Test*"}}],
                "user",
                "room",
                connector,
            )
        )
        self.assertTrue(connector.slack.api_call.called)
github opsdroid / opsdroid / tests / test_connector_slack.py View on Github external
payload = {
            "type": "view_closed",
            "team": {"id": "TXXXXXX", "domain": "coverbands"},
            "user": {"id": "UXXXXXX", "name": "dreamweaver"},
        }

        interactive_action = InteractiveAction(payload)
        with amock.patch("aiohttp.ClientSession.post") as patched_request:
            patched_request.return_value = asyncio.Future()
            patched_request.return_value.set_result(result)
            await interactive_action.respond("Respond called without response_url")
            self.assertFalse(patched_request.called)

        with OpsDroid() as opsdroid:
            connector = ConnectorSlack({"token": "abc123"}, opsdroid=OpsDroid())
            raw_message = {
                "text": "Hello world",
                "user_id": "user_id",
                "user": "user",
                "room": "default",
            }
            message = Message(
                text="Hello world",
                user_id="user_id",
                user="user",
                target="default",
                connector=connector,
                raw_event=raw_message,
            )
            opsdroid.send = amock.CoroutineMock()
github opsdroid / opsdroid / tests / test_connector_slack.py View on Github external
async def test_respond(self):
        connector = ConnectorSlack({"token": "abc123"}, opsdroid=OpsDroid())
        connector.slack.api_call = amock.CoroutineMock()
        await connector.send(
            Message(text="test", user="user", target="room", connector=connector)
        )
        self.assertTrue(connector.slack.api_call.called)
github opsdroid / opsdroid / tests / test_connector_slack.py View on Github external
async def test_process_message(self):
        """Test processing a slack message."""
        connector = ConnectorSlack({"token": "abc123"}, opsdroid=OpsDroid())
        connector.lookup_username = amock.CoroutineMock()
        connector.lookup_username.return_value = {"name": "testuser"}
        connector.opsdroid = amock.CoroutineMock()
        connector.opsdroid.parse = amock.CoroutineMock()

        message = {  # https://api.slack.com/events/message
            "type": "message",
            "channel": "C2147483705",
            "user": "U2147483697",
            "text": "Hello, world!",
            "ts": "1355517523.000005",
            "edited": {"user": "U2147483697", "ts": "1355517536.000001"},
        }
        await connector.process_message(data=message)
        self.assertTrue(connector.opsdroid.parse.called)
github opsdroid / opsdroid / tests / test_connector_slack.py View on Github external
async def test_abort_on_connection_error(self):
        connector = ConnectorSlack({"token": "abc123"})
        connector.slack_rtm._connect_and_read = amock.CoroutineMock()
        connector.slack_rtm._connect_and_read.side_effect = Exception()
        connector.slack_rtm.stop = amock.CoroutineMock()

        with self.assertRaises(Exception):
            await connector.connect()
        self.assertTrue(connector.slack_rtm.stop.called)
github opsdroid / opsdroid / tests / test_connector_slack.py View on Github external
def test_missing_api_key(self):
        """Test that creating without an API key raises an error."""
        with self.assertRaises(KeyError):
            ConnectorSlack({}, opsdroid=OpsDroid())
github opsdroid / opsdroid / tests / test_connector_slack.py View on Github external
async def test_replace_usernames(self):
        connector = ConnectorSlack({"token": "abc123"}, opsdroid=OpsDroid())
        connector.lookup_username = amock.CoroutineMock()
        connector.lookup_username.return_value = {"name": "user"}
        result = await connector.replace_usernames("Hello <@U023BECGF>!")
        self.assertEqual(result, "Hello user!")
github opsdroid / opsdroid / tests / test_connector_slack.py View on Github external
async def test_connect_auth_fail(self):
        connector = ConnectorSlack({"token": "abc123"}, opsdroid=OpsDroid())
        opsdroid = amock.CoroutineMock()
        opsdroid.eventloop = self.loop
        connector.slack_rtm._connect_and_read = amock.Mock()
        connector.slack_rtm._connect_and_read.side_effect = slack.errors.SlackApiError(
            message="", response=""
        )

        await connector.connect()
        self.assertLogs("_LOGGER", "error")
github opsdroid / opsdroid / tests / test_connector_slack.py View on Github external
async def test_message_action_interactivity(self):
        """Test the message_action interactivity type in Slack interactions handler."""

        connector = ConnectorSlack({"token": "abc123"}, opsdroid=OpsDroid())
        connector.opsdroid = amock.CoroutineMock()
        connector.opsdroid.parse = amock.CoroutineMock()

        req_ob = {
            "token": "Nj2rfC2hU8mAfgaJLemZgO7H",
            "callback_id": "chirp_message",
            "type": "message_action",
            "trigger_id": "13345224609.8534564800.6f8ab1f53e13d0cd15f96106292d5536",
            "response_url": "https://hooks.slack.com/app-actions/T0MJR11A4/21974584944/yk1S9ndf35Q1flupVG5JbpM6",
            "team": {"id": "T0MJRM1A7", "domain": "pandamonium"},
            "channel": {"id": "D0LFFBKLZ", "name": "cats"},
            "user": {"id": "U0D15K92L", "name": "dr_maomao"},
            "message": {
                "type": "message",
                "user": "U0MJRG1AL",
                "ts": "1516229207.000133",