Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_configure_reporting_manuf():
ep = mock.MagicMock()
cluster = zcl.Cluster.from_id(ep, 6)
cluster.request = mock.MagicMock(name="request")
cluster.configure_reporting(0, 10, 20, 1)
cluster.request.assert_called_with(
True,
0x06,
mock.ANY,
mock.ANY,
expect_reply=True,
manufacturer=None,
tries=1,
tsn=mock.ANY,
)
cluster.request.reset_mock()
manufacturer_id = 0xFCFC
210,
b"\x02\x01\x00",
)
async def mock_send(nwk, aps, radius, tsn, data):
if not ezsp_running:
raise EzspError
if broadcast_success:
if not send_timeout:
app._pending[tsn].result.set_result((0, "sendBroadcast failure"))
return [0]
else:
return [t.EmberStatus.ERR_FATAL]
app._ezsp.sendBroadcast.side_effect = mock_send
app.get_sequence = mock.MagicMock(return_value=mock.sentinel.msg_tag)
res = await app.broadcast(
profile, cluster, src_ep, dst_ep, grpid, radius, tsn, data
)
assert app._ezsp.sendBroadcast.call_count == 1
assert app._ezsp.sendBroadcast.call_args[0][2] == radius
assert app._ezsp.sendBroadcast.call_args[0][3] == mock.sentinel.msg_tag
assert app._ezsp.sendBroadcast.call_args[0][4] == data
assert len(app._pending) == 0
return res
def ota_cluster():
ep = mock.MagicMock()
ep.device.application.ota = mock.MagicMock(spec_set=ota.OTA)
return zcl.Cluster._registry[0x0019](ep)
def test_queue_message_empty_msg_1(self):
plugin1 = MagicMock()
msg = ""
yield from self.robot.queue_message(msg, plugin1)
self.assertEqual(plugin1.method_calls, [])
def test_general_command_reply(cluster):
cluster.request = mock.MagicMock()
cluster.reply = mock.MagicMock()
cmd_id = 0x0D
cluster.general_command(cmd_id, True, [], manufacturer=0x4567)
assert cluster.request.call_count == 0
assert cluster.reply.call_count == 1
cluster.reply.assert_called_with(
True, cmd_id, mock.ANY, True, [], manufacturer=0x4567, tsn=None
)
cluster.request.reset_mock()
cluster.reply.reset_mock()
cluster.general_command(
cmd_id, True, [], manufacturer=0x4567, tsn=mock.sentinel.tsn
)
assert cluster.request.call_count == 0
def _group_remove_mock(ep, success=True, no_groups_cluster=False, not_member=False):
async def mock_req(*args, **kwargs):
if success:
return [ZCLStatus.SUCCESS, mock.sentinel.group_id]
return [ZCLStatus.DUPLICATE_EXISTS, mock.sentinel.group_id]
if not no_groups_cluster:
ep.add_input_cluster(4)
ep.request = mock.MagicMock(side_effect=mock_req)
ep.device.application.groups = mock.MagicMock(spec_set=group.Groups)
grp = mock.MagicMock(spec_set=group.Group)
ep.device.application.groups.__contains__.return_value = not not_member
ep.device.application.groups.__getitem__.return_value = grp
return ep, grp
async def test_parse_regex_raises(self):
with OpsDroid() as opsdroid:
mock_skill = await self.getRaisingMockSkill()
opsdroid.skills.append(match_regex(r"(.*)")(mock_skill))
self.assertEqual(len(opsdroid.skills), 1)
mock_connector = amock.MagicMock()
mock_connector.send = amock.CoroutineMock()
message = Message("Hello world", "user", "default", mock_connector)
skills = await parse_regex(opsdroid, opsdroid.skills, message)
self.assertEqual(mock_skill, skills[0]["skill"])
def test_MagicMock_is_not_CoroutineFunctionMock(self):
self.assertNotIsInstance(asynctest.mock.MagicMock(), asynctest.mock.CoroutineFunctionMock)