Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
member="InterfacesRemoved",
).asFuture(loop)
)
rules.append(
await bus.addMatch(
parse_msg,
interface="org.freedesktop.DBus.Properties",
member="PropertiesChanged",
).asFuture(loop)
)
# Find the HCI device to use for scanning and get cached device properties
objects = await bus.callRemote(
"/",
"GetManagedObjects",
interface=defs.OBJECT_MANAGER_INTERFACE,
destination=defs.BLUEZ_SERVICE,
).asFuture(loop)
adapter_path, interface = _filter_on_adapter(objects, device)
cached_devices = dict(_filter_on_device(objects))
# dd = {'objectPath': '/org/bluez/hci0', 'methodName': 'StartDiscovery',
# 'interface': 'org.bluez.Adapter1', 'destination': 'org.bluez',
# 'signature': '', 'body': (), 'expectReply': True, 'autoStart': True,
# 'timeout': None, 'returnSignature': ''}
# Running Discovery loop.
await bus.callRemote(
adapter_path,
"SetDiscoveryFilter",
interface="org.bluez.Adapter1",
destination="org.bluez",
signature="a{sv}",
def listen_interfaces_added(bus, loop, callback):
"""Create a future for a InterfacesAdded signal listener.
Args:
bus: The system bus object to use.
loop: The asyncio loop to use for adding the future to.
callback: The callback function to run when signal is received.
Returns:
Integer rule id.
"""
return bus.addMatch(
callback, interface=OBJECT_MANAGER_INTERFACE, member="InterfacesAdded"
).asFuture(loop)
def listen_interfaces_removed(bus, loop, callback):
"""Create a future for a InterfacesAdded signal listener.
Args:
bus: The system bus object to use.
loop: The asyncio loop to use for adding the future to.
callback: The callback function to run when signal is received.
Returns:
Integer rule id.
"""
return bus.addMatch(
callback, interface=OBJECT_MANAGER_INTERFACE, member="InterfacesRemoved"
).asFuture(loop)