Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import argparse
from asyncio.tasks import ensure_future
loop = asyncio.get_event_loop()
parser = argparse.ArgumentParser(
description="Perform Bluetooth Low Energy device scan"
)
parser.add_argument("-i", dest="dev", default="hci0", help="HCI device")
parser.add_argument(
"-t", dest="timeout", type=int, default=5, help="Duration to scan for"
)
args = parser.parse_args()
out = loop.run_until_complete(
ensure_future(discover(device=args.dev, timeout=float(args.timeout)))
)
for o in out:
print(str(o))
async def connect(self, **kwargs) -> bool:
"""Connect to the specified GATT server.
Keyword Args:
timeout (float): Timeout for required ``discover`` call. Defaults to 2.0.
Returns:
Boolean representing connection status.
"""
# A Discover must have been run before connecting to any devices. Do a quick one here
# to ensure that it has been done.
timeout = kwargs.get("timeout", self._timeout)
await discover(timeout=timeout, device=self.device, loop=self.loop)
# Create system bus
self._bus = await txdbus_connect(reactor, busAddress="system").asFuture(
self.loop
)
# TODO: Handle path errors from txdbus/dbus
self._device_path = get_device_object_path(self.device, self.address)
def _services_resolved_callback(message):
iface, changed, invalidated = message.body
is_resolved = defs.DEVICE_INTERFACE and changed.get(
"ServicesResolved", False
)
if iface == is_resolved:
logger.info("Services resolved.")
self.services_resolved = True