Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self['system.is_freenas'] = Mock(return_value=True)
self.__schemas = Schemas()
self.call_hook = CoroutineMock()
self.call_hook_inline = Mock()
def testStartTcpServer(self):
''' Test that the modbus tcp asyncio server starts correctly '''
identity = ModbusDeviceIdentification(info={0x00: 'VendorName'})
self.loop = asynctest.Mock(self.loop)
server = yield from StartTcpServer(context=self.context,loop=self.loop,identity=identity)
self.assertEqual(server.control.Identity.VendorName, 'VendorName')
if PYTHON_VERSION >= (3, 6):
self.loop.create_server.assert_called_once()
async def test_result_with_wrapped_object(self):
stub = StubClient()
mock = asynctest.Mock(stub, wraps=stub)
cache = {}
stub.add_user(StubClient.User(1, "a.dmin"))
cache_users(mock, cache)
mock.get_users.assert_called()
self.assertEqual(stub.users_to_return, mock.get_users())
def mock_connection_factory(monkeypatch):
"""Mock the create functions for serial and TCP Asyncio connections."""
from dsmr_parser.clients.protocol import DSMRProtocol
transport = asynctest.Mock(spec=asyncio.Transport)
protocol = asynctest.Mock(spec=DSMRProtocol)
@asyncio.coroutine
def connection_factory(*args, **kwargs):
"""Return mocked out Asyncio classes."""
return (transport, protocol)
connection_factory = Mock(wraps=connection_factory)
# apply the mock to both connection factories
monkeypatch.setattr(
'dsmr_parser.clients.protocol.create_dsmr_reader',
connection_factory)
monkeypatch.setattr(
'dsmr_parser.clients.protocol.create_tcp_dsmr_reader',
connection_factory)
async def test_connection_status_signalling(hass):
"""Make sure that connection status triggers a dispatcher send."""
data = deepcopy(DECONZ_WEB_REQUEST)
gateway = await setup_deconz_integration(
hass, ENTRY_CONFIG, options={}, get_state_response=data
)
event_call = Mock()
unsub = async_dispatcher_connect(hass, gateway.signal_reachable, event_call)
gateway.async_connection_status_callback(False)
await hass.async_block_till_done()
assert gateway.available is False
assert len(event_call.mock_calls) == 1
unsub()
async def test__interfaces_service__create_lagg():
m = Middleware()
m['interface.query'] = Mock(return_value=INTERFACES)
m['interface.lag_supported_protocols'] = Mock(return_value=['LACP'])
m['interface.get_next_name'] = Mock()
m['datastore.query'] = Mock(return_value=[])
m['datastore.insert'] = Mock(return_value=5)
await InterfaceService(m).create(Mock(), {
'type': 'LINK_AGGREGATION',
'lag_protocol': 'LACP',
'lag_ports': ['em0', 'em1'],
})
called = False
def handler(evt):
nonlocal called
called = True
assert evt.data == {
"component_id": "main",
"device_id": device.device_id,
"location_id": event.location_id,
"value": "pushed",
"name": device.label,
"data": None,
}
hass.bus.async_listen(EVENT_BUTTON, handler)
broker = smartthings.DeviceBroker(hass, config_entry, Mock(), Mock(), [device], [])
broker.connect()
# pylint:disable=protected-access
await broker._event_handler(request, None, None)
await hass.async_block_till_done()
assert called
async def test_failed_update_successful_login(hass):
"""Running update can login when requested."""
controller = await setup_unifi_integration(
hass,
ENTRY_CONFIG,
options={},
sites=SITES,
clients_response=[],
devices_response=[],
clients_all_response=[],
)
with patch.object(
controller.api.clients, "update", side_effect=aiounifi.LoginRequired
), patch.object(controller.api, "login", return_value=Mock(True)):
await controller.async_update()
await hass.async_block_till_done()
assert controller.available is True
def _get_mock(instance):
"""Create a mock and copy instance attributes over mock."""
if isinstance(instance, Mock):
instance.__dict__.update(
instance._mock_wraps.__dict__ # pylint: disable=protected-access
)
return instance
mock = Mock(spec=instance, wraps=instance)
mock.__dict__.update(instance.__dict__)
return mock