Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_failed_update_successful_login(hass):
"""Running update can login when requested."""
controller = await setup_unifi_integration(
hass,
ENTRY_CONFIG,
options={},
sites=SITES,
clients_response=[],
devices_response=[],
clients_all_response=[],
)
with patch.object(
controller.api.clients, "update", side_effect=aiounifi.LoginRequired
), patch.object(controller.api, "login", return_value=Mock(True)):
await controller.async_update()
await hass.async_block_till_done()
assert controller.available is True
await hass.async_block_till_done()
set_callback.assert_called_with("/lights/1/state", {"on": False})
with patch.object(
warning_device_device, "_async_set_callback", return_value=True
) as set_callback:
await hass.services.async_call(
switch.DOMAIN,
switch.SERVICE_TURN_ON,
{"entity_id": "switch.warning_device"},
blocking=True,
)
await hass.async_block_till_done()
set_callback.assert_called_with("/lights/3/state", {"alert": "lselect"})
with patch.object(
warning_device_device, "_async_set_callback", return_value=True
) as set_callback:
await hass.services.async_call(
switch.DOMAIN,
switch.SERVICE_TURN_OFF,
{"entity_id": "switch.warning_device"},
blocking=True,
)
await hass.async_block_till_done()
set_callback.assert_called_with("/lights/3/state", {"alert": "none"})
await gateway.async_reset()
assert len(hass.states.async_all()) == 0
"""Test that scenes works."""
data = deepcopy(DECONZ_WEB_REQUEST)
data["groups"] = deepcopy(GROUPS)
gateway = await setup_deconz_integration(
hass, ENTRY_CONFIG, options={}, get_state_response=data
)
assert "scene.light_group_scene" in gateway.deconz_ids
assert len(hass.states.async_all()) == 1
light_group_scene = hass.states.get("scene.light_group_scene")
assert light_group_scene
group_scene = gateway.api.groups["1"].scenes["1"]
with patch.object(
group_scene, "_async_set_state_callback", return_value=True
) as set_callback:
await hass.services.async_call(
"scene", "turn_on", {"entity_id": "scene.light_group_scene"}, blocking=True
)
await hass.async_block_till_done()
set_callback.assert_called_with("/groups/1/scenes/1/recall", {})
await gateway.async_reset()
assert len(hass.states.async_all()) == 0
warning_device = hass.states.get("switch.warning_device")
assert warning_device.state == "off"
with patch.object(
on_off_switch_device, "_async_set_callback", return_value=True
) as set_callback:
await hass.services.async_call(
switch.DOMAIN,
switch.SERVICE_TURN_ON,
{"entity_id": "switch.on_off_switch"},
blocking=True,
)
await hass.async_block_till_done()
set_callback.assert_called_with("/lights/1/state", {"on": True})
with patch.object(
on_off_switch_device, "_async_set_callback", return_value=True
) as set_callback:
await hass.services.async_call(
switch.DOMAIN,
switch.SERVICE_TURN_OFF,
{"entity_id": "switch.on_off_switch"},
blocking=True,
)
await hass.async_block_till_done()
set_callback.assert_called_with("/lights/1/state", {"on": False})
with patch.object(
warning_device_device, "_async_set_callback", return_value=True
) as set_callback:
await hass.services.async_call(
switch.DOMAIN,
@asynctest.patch.object(server_protocol.DNSClient, 'query')
@patch.object(httpproxy.DOHApplication, 'on_answer')
@asynctest.patch('dohproxy.httpproxy.DNSClient')
@unittest_run_loop
async def test_mock_dnsclient_assigned_logger(self, MockedDNSClient,
Mockedon_answer,
Mockedquery):
""" Test that when MockedDNSClient is created with the doh-httpproxy
logger and DEBUG level
"""
Mockedquery.return_value = self.dnsq
Mockedon_answer.return_value = aiohttp.web.Response(status=200,
body=b'Done')
params = utils.build_query_params(self.dnsq.to_wire())
request = await self.client.request(
'GET', self.endpoint, params=params)
request.remote = "127.0.0.1"
async def test_handling_action_hooks(gh_sut: GithubController, hook_data, hook_method):
with asynctest.patch.object(gh_sut, 'get_request_json', return_value=hook_data):
with asynctest.patch.object(gh_sut, 'validate_webhook_secret', return_value='AUTHORIZED'):
with asynctest.patch.object(gh_sut, hook_method) as handle_method:
result = await gh_sut.handle_hook(asynctest.MagicMock())
assert result.status == 200
handle_method.assert_called_once_with(hook_data)
async def test_triggear_sync_label_should_survive_two_gh_exceptions_when_looking_for_pr(gh_sut: GithubController,
mocker: MockFixture):
repo_labels_mock = mocker.patch.object(gh_sut, 'get_repo_labels', return_value=[Labels.pr_sync])
with asynctest.patch.object(gh_sut, 'set_pr_sync_label', side_effect=[github.GithubException(404, 'Not found'),
github.GithubException(404, 'Not found'),
None]) as set_label_mock:
await gh_sut.set_sync_label('test_repo_1', 38)
repo_labels_mock.assert_called_once_with('test_repo_1')
assert set_label_mock.call_count == 3
async def test_trigger_registered_job(gh_sut: GithubController,
mocker: MockFixture):
mock_get_next_build: MagicMock = mocker.patch.object(gh_sut, 'get_jobs_next_build_number', return_value=1)
with patch.object(gh_sut, 'build_jenkins_job') as mock_build_jenkins_job:
with patch.object(gh_sut, 'is_job_building', return_value=False) as mock_is_job_building:
await gh_sut.trigger_registered_job('job_name',
['branch', 'sha'],
'test_repo',
'test_sha',
'test_branch')
mock_is_job_building.assert_called_once()
mock_get_next_build.assert_called_once()
mock_build_jenkins_job.assert_called_once()