Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_empty_hook_handle(gh_sut: GithubController, mocker):
label = mocker.patch.object(gh_sut, 'handle_labeled')
sync = mocker.patch.object(gh_sut, 'handle_synchronize')
comment = mocker.patch.object(gh_sut, 'handle_comment')
push = mocker.patch.object(gh_sut, 'handle_push')
with asynctest.patch.object(gh_sut, 'get_request_json', return_value={}):
with asynctest.patch.object(gh_sut, 'validate_webhook_secret', return_value='AUTHORIZED'):
result = await gh_sut.handle_hook(asynctest.MagicMock())
assert result.status == 200
label.assert_not_called()
sync.assert_not_called()
comment.assert_not_called()
push.assert_not_called()
def test_ok(self, synse_app):
with asynctest.patch('synse_server.cmd.plugin') as mock_cmd:
mock_cmd.return_value = {
'id': '12345',
'tag': 'test/plugin',
'active': True,
'network': {
'address': 'localhost:5001',
'protocol': 'tcp',
},
'version': {
'sdk_version': '3.0',
},
'health': {
'status': 'OK',
},
}
async def test_request_plugin_invalid_usage(self):
with asynctest.patch('synse_server.cmd.plugin') as mock_cmd:
with asynctest.patch('websockets.WebSocketCommonProtocol.send') as mock_send:
mock_cmd.return_value = [{
'key': 'value',
}]
p = make_payload(data={})
m = websocket.MessageHandler(websockets.WebSocketCommonProtocol())
with pytest.raises(errors.InvalidUsage):
await m.handle_request_plugin(p)
mock_cmd.assert_not_called()
mock_send.assert_not_called()
@patch("pexpect.pxssh.pxssh.logout")
@patch("pexpect.pxssh.pxssh.login", autospec=True)
@patch("pexpect.pxssh.pxssh.prompt")
@patch("pexpect.pxssh.pxssh.sendline")
async def test_to_get_update(mock_sendline, mock_prompt, mock_login, mock_logout, hass):
"""Testing exception in get_update matching."""
conf_dict = {
DOMAIN: {
CONF_PLATFORM: "unifi_direct",
CONF_HOST: "fake_host",
CONF_USERNAME: "fake_user",
CONF_PASSWORD: "fake_pass",
CONF_PORT: 22,
CONF_TRACK_NEW: True,
CONF_CONSIDER_HOME: timedelta(seconds=180),
}
}
async def test_invalid_service_calls(hass):
"""Test invalid service call arguments get discarded."""
add_entities = MagicMock()
await group.async_setup_platform(
hass, {"entities": ["light.test1", "light.test2"]}, add_entities
)
assert add_entities.call_count == 1
grouped_light = add_entities.call_args[0][0][0]
grouped_light.hass = hass
with asynctest.patch.object(hass.services, "async_call") as mock_call:
await grouped_light.async_turn_on(brightness=150, four_oh_four="404")
data = {"entity_id": ["light.test1", "light.test2"], "brightness": 150}
mock_call.assert_called_once_with("light", "turn_on", data, blocking=True)
mock_call.reset_mock()
await grouped_light.async_turn_off(transition=4, four_oh_four="404")
data = {"entity_id": ["light.test1", "light.test2"], "transition": 4}
mock_call.assert_called_once_with("light", "turn_off", data, blocking=True)
mock_call.reset_mock()
data = {
"brightness": 150,
"xy_color": (0.5, 0.42),
"rgb_color": (80, 120, 50),
"color_temp": 1234,
"white_value": 1,
def test_verbose(self, mock_get_cached_list_of_running_tasks_from_frameworks):
with asynctest.patch(
"paasta_tools.api.views.instance.get_cached_list_of_not_running_tasks_from_frameworks",
autospec=True,
) as mock_get_cached_list_of_not_running_tasks_from_frameworks, asynctest.patch(
"paasta_tools.api.views.instance.get_mesos_running_task_dict", autospec=True
) as mock_get_mesos_running_task_dict, asynctest.patch(
"paasta_tools.api.views.instance.get_mesos_non_running_task_dict",
autospec=True,
) as mock_get_mesos_non_running_task_dict:
running_task_1 = Task(
master=mock.Mock(), items={"id": "fake--service.fake--instance.1"}
)
running_task_2 = Task(
master=mock.Mock(), items={"id": "fake--service.fake--instance.2"}
)
non_running_task = Task(
master=mock.Mock(),
async def test_unload_entry(hass, config_entry, controller):
"""Test entries are unloaded correctly."""
controller_manager = Mock(ControllerManager)
hass.data[DOMAIN] = {DATA_CONTROLLER_MANAGER: controller_manager}
with patch.object(
hass.config_entries, "async_forward_entry_unload", return_value=True
) as unload:
assert await async_unload_entry(hass, config_entry)
await hass.async_block_till_done()
assert controller_manager.disconnect.call_count == 1
assert unload.call_count == 1
assert DOMAIN not in hass.data
@patch("kutana.backends.Vkontakte._request")
@patch("kutana.backends.Vkontakte._upload_file_to_vk")
@patch("kutana.backends.Vkontakte._make_attachment")
def test_upload_attachment_error_retry(
mock_make_attachment,
mock_upload_file_to_vk,
mock_request,
):
mock_request.side_effect = [
{"upload_url": "_"},
RequestException(None, None, None, {"error_code": 1}),
{"upload_url": "_"},
"ok",
]
mock_upload_file_to_vk.side_effect = [
"ok",
async def test_get_plugin_no_plugin(self, simple_device):
with asynctest.patch('synse_server.cache.get_device') as mock_get:
mock_get.return_value = simple_device
p = await cache.get_plugin('device-1')
assert p is None
mock_get.assert_called_once()
mock_get.assert_called_with('device-1')
def setUp(self):
mock_get_patcher = patch('aiohttp.ClientSession.get', new_callable=MockRequest)
self.mock_get = mock_get_patcher.start()
self.mock_get.method = 'GET'
self.mock_get.return_value.json = CoroutineMock()
self.mock_get.return_value.status = 200
self.addCleanup(mock_get_patcher.stop)
mock_post_patcher = patch('aiohttp.ClientSession.post', new_callable=MockRequest)
self.mock_post = mock_post_patcher.start()
self.mock_post.method = 'POST'
self.mock_post.return_value.json = CoroutineMock()
self.mock_post.return_value.status = 200
self.addCleanup(mock_post_patcher.stop)
mock_del_patcher = patch('aiohttp.ClientSession.delete', new_callable=MockRequest)
self.mock_del = mock_del_patcher.start()
self.mock_del.method = 'DEL'
self.mock_del.return_value.json = CoroutineMock()
self.mock_del.return_value.status = 200
self.addCleanup(mock_del_patcher.stop)