Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init(self):
"""Test that the connector is initialised properly."""
connector = ConnectorSlack({"token": "abc123"}, opsdroid=OpsDroid())
self.assertEqual("#general", connector.default_target)
self.assertEqual("slack", connector.name)
self.assertEqual(10, connector.timeout)
async def test_send_blocks(self):
connector = ConnectorSlack({"token": "abc123"}, opsdroid=OpsDroid())
connector.slack.api_call = amock.CoroutineMock()
await connector.send(
Blocks(
[{"type": "section", "text": {"type": "mrkdwn", "text": "*Test*"}}],
"user",
"room",
connector,
)
)
self.assertTrue(connector.slack.api_call.called)
payload = {
"type": "view_closed",
"team": {"id": "TXXXXXX", "domain": "coverbands"},
"user": {"id": "UXXXXXX", "name": "dreamweaver"},
}
interactive_action = InteractiveAction(payload)
with amock.patch("aiohttp.ClientSession.post") as patched_request:
patched_request.return_value = asyncio.Future()
patched_request.return_value.set_result(result)
await interactive_action.respond("Respond called without response_url")
self.assertFalse(patched_request.called)
with OpsDroid() as opsdroid:
connector = ConnectorSlack({"token": "abc123"}, opsdroid=OpsDroid())
raw_message = {
"text": "Hello world",
"user_id": "user_id",
"user": "user",
"room": "default",
}
message = Message(
text="Hello world",
user_id="user_id",
user="user",
target="default",
connector=connector,
raw_event=raw_message,
)
opsdroid.send = amock.CoroutineMock()
async def test_respond(self):
connector = ConnectorSlack({"token": "abc123"}, opsdroid=OpsDroid())
connector.slack.api_call = amock.CoroutineMock()
await connector.send(
Message(text="test", user="user", target="room", connector=connector)
)
self.assertTrue(connector.slack.api_call.called)
async def test_process_message(self):
"""Test processing a slack message."""
connector = ConnectorSlack({"token": "abc123"}, opsdroid=OpsDroid())
connector.lookup_username = amock.CoroutineMock()
connector.lookup_username.return_value = {"name": "testuser"}
connector.opsdroid = amock.CoroutineMock()
connector.opsdroid.parse = amock.CoroutineMock()
message = { # https://api.slack.com/events/message
"type": "message",
"channel": "C2147483705",
"user": "U2147483697",
"text": "Hello, world!",
"ts": "1355517523.000005",
"edited": {"user": "U2147483697", "ts": "1355517536.000001"},
}
await connector.process_message(data=message)
self.assertTrue(connector.opsdroid.parse.called)
async def test_abort_on_connection_error(self):
connector = ConnectorSlack({"token": "abc123"})
connector.slack_rtm._connect_and_read = amock.CoroutineMock()
connector.slack_rtm._connect_and_read.side_effect = Exception()
connector.slack_rtm.stop = amock.CoroutineMock()
with self.assertRaises(Exception):
await connector.connect()
self.assertTrue(connector.slack_rtm.stop.called)
def test_missing_api_key(self):
"""Test that creating without an API key raises an error."""
with self.assertRaises(KeyError):
ConnectorSlack({}, opsdroid=OpsDroid())
async def test_replace_usernames(self):
connector = ConnectorSlack({"token": "abc123"}, opsdroid=OpsDroid())
connector.lookup_username = amock.CoroutineMock()
connector.lookup_username.return_value = {"name": "user"}
result = await connector.replace_usernames("Hello <@U023BECGF>!")
self.assertEqual(result, "Hello user!")
async def test_connect_auth_fail(self):
connector = ConnectorSlack({"token": "abc123"}, opsdroid=OpsDroid())
opsdroid = amock.CoroutineMock()
opsdroid.eventloop = self.loop
connector.slack_rtm._connect_and_read = amock.Mock()
connector.slack_rtm._connect_and_read.side_effect = slack.errors.SlackApiError(
message="", response=""
)
await connector.connect()
self.assertLogs("_LOGGER", "error")
async def test_message_action_interactivity(self):
"""Test the message_action interactivity type in Slack interactions handler."""
connector = ConnectorSlack({"token": "abc123"}, opsdroid=OpsDroid())
connector.opsdroid = amock.CoroutineMock()
connector.opsdroid.parse = amock.CoroutineMock()
req_ob = {
"token": "Nj2rfC2hU8mAfgaJLemZgO7H",
"callback_id": "chirp_message",
"type": "message_action",
"trigger_id": "13345224609.8534564800.6f8ab1f53e13d0cd15f96106292d5536",
"response_url": "https://hooks.slack.com/app-actions/T0MJR11A4/21974584944/yk1S9ndf35Q1flupVG5JbpM6",
"team": {"id": "T0MJRM1A7", "domain": "pandamonium"},
"channel": {"id": "D0LFFBKLZ", "name": "cats"},
"user": {"id": "U0D15K92L", "name": "dr_maomao"},
"message": {
"type": "message",
"user": "U0MJRG1AL",
"ts": "1516229207.000133",