Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from bleak.backends.corebluetooth import (
BleakClientCoreBluetooth as BleakClient,
discover,
) # noqa
elif platform.system() == "Windows":
# Requires Windows 10 Creators update at least, i.e. Window 10.0.16299
_vtup = platform.win32_ver()[1].split(".")
if int(_vtup[0]) != 10:
raise BleakError(
"Only Windows 10 is supported. Detected was {0}".format(
platform.win32_ver()
)
)
if (int(_vtup[1]) == 0) and (int(_vtup[2]) < 16299):
raise BleakError(
"Requires at least Windows 10 version 0.16299 (Fall Creators Update)."
)
from bleak.backends.dotnet.discovery import discover # noqa
from bleak.backends.dotnet.client import BleakClientDotNet as BleakClient # noqa
def cli():
import argparse
from asyncio.tasks import ensure_future
loop = asyncio.get_event_loop()
parser = argparse.ArgumentParser(
description="Perform Bluetooth Low Energy device scan"
)
Service org.bluez
Interface org.bluez.Device1
Object path [variable prefix]/{hci0,hci1,...}/dev_XX_XX_XX_XX_XX_XX
Args:
hci_device (str): Which bluetooth adapter to connect with.
address (str): The MAC adress of the bluetooth device.
Returns:
String representation of device object path on format
`/org/bluez/{hci0,hci1,...}/dev_XX_XX_XX_XX_XX_XX`.
"""
if not validate_mac_address(address):
raise BleakError("{0} is not a valid MAC adrdess.".format(address))
if not validate_hci_device(hci_device):
raise BleakError("{0} is not a valid HCI device.".format(hci_device))
# TODO: Join using urljoin? Or pathlib?
return "/org/bluez/{0}/dev_{1}".format(
hci_device, "_".join(address.split(":")).upper()
)
from bleak.backends.bluezdbus.discovery import discover # noqa
from bleak.backends.bluezdbus.client import (
BleakClientBlueZDBus as BleakClient
) # noqa
elif platform.system() == "Darwin":
# TODO: Check if macOS version has Core Bluetooth, raise error otherwise.
from bleak.backends.corebluetooth import (
BleakClientCoreBluetooth as BleakClient,
discover,
) # noqa
elif platform.system() == "Windows":
# Requires Windows 10 Creators update at least, i.e. Window 10.0.16299
_vtup = platform.win32_ver()[1].split(".")
if int(_vtup[0]) != 10:
raise BleakError(
"Only Windows 10 is supported. Detected was {0}".format(
platform.win32_ver()
)
)
if (int(_vtup[1]) == 0) and (int(_vtup[2]) < 16299):
raise BleakError(
"Requires at least Windows 10 version 0.16299 (Fall Creators Update)."
)
from bleak.backends.dotnet.discovery import discover # noqa
from bleak.backends.dotnet.client import BleakClientDotNet as BleakClient # noqa
def cli():
import argparse
def peripheral_didDiscoverCharacteristicsForService_error_(
self, peripheral: CBPeripheral, service: CBService, error: NSError
):
serviceUUID = service.UUID().UUIDString()
if error is not None:
raise BleakError(
"Failed to discover services for service {}: {}".format(
serviceUUID, error
)
)
logger.debug("Characteristics discovered")
self._service_characteristic_discovered_log[serviceUUID] = True
def peripheral_didDiscoverDescriptorsForCharacteristic_error_(
self, peripheral: CBPeripheral, characteristic: CBCharacteristic, error: NSError
):
cUUID = characteristic.UUID().UUIDString()
if error is not None:
raise BleakError(
"Failed to discover descriptors for characteristic {}: {}".format(
cUUID, error
)
)
logger.debug("Descriptor discovered {}".format(cUUID))
self._characteristic_descriptor_log[cUUID] = True
connected = False
if self._services_resolved:
# If services has been resolved, then we assume that we are connected. This is due to
# some issues with getting `is_connected` to give correct response here.
connected = True
else:
for _ in range(5):
await asyncio.sleep(0.2, loop=self.loop)
connected = await self.is_connected()
if connected:
break
if connected:
logger.debug("Connection successful.")
else:
raise BleakError(
"Connection to {0} was not successful!".format(self.address)
)
return connected
def peripheral_didUpdateNotificationStateForCharacteristic_error_(
self, peripheral: CBPeripheral, characteristic: CBCharacteristic, error: NSError
):
cUUID = characteristic.UUID().UUIDString()
if error is not None:
raise BleakError(
"Failed to update the notification status for characteristic {}: {}".format(
cUUID, error
)
)
logger.debug("Character Notify Update")
self._characteristic_notify_log[cUUID] = True
if self._services_resolved:
return self.services
sleep_loop_sec = 0.02
total_slept_sec = 0
while total_slept_sec < 5.0:
properties = await self._get_device_properties()
services_resolved = properties.get("ServicesResolved", False)
if services_resolved:
break
await asyncio.sleep(sleep_loop_sec, loop=self.loop)
total_slept_sec += sleep_loop_sec
if not services_resolved:
raise BleakError("Services discovery error")
logger.debug("Get Services...")
objs = await get_managed_objects(
self._bus, self.loop, self._device_path + "/service"
)
# There is no guarantee that services are listed before characteristics
# Managed Objects dict.
# Need multiple iterations to construct the Service Collection
_chars, _descs = [], []
for object_path, interfaces in objs.items():
logger.debug(utils.format_GATT_object(object_path, interfaces))
if defs.GATT_SERVICE_INTERFACE in interfaces:
service = interfaces.get(defs.GATT_SERVICE_INTERFACE)
).asFuture(self.loop)
except RemoteError as e:
raise BleakError(str(e))
if await self.is_connected():
logger.debug("Connection successful.")
else:
raise BleakError(
"Connection to {0} was not successful!".format(self.address)
)
# Get all services. This means making the actual connection.
await self.get_services()
properties = await self._get_device_properties()
if not properties.get("Connected"):
raise BleakError("Connection failed!")
await self._bus.delMatch(rule_id).asFuture(self.loop)
self._rules["PropChanged"] = await signals.listen_properties_changed(
self._bus, self.loop, self._properties_changed_callback
)
return True
def peripheral_didWriteValueForDescriptor_error_(
self, peripheral: CBPeripheral, descriptor: CBDescriptor, error: NSError
):
dUUID = descriptor.UUID().UUIDString()
if error is not None:
raise BleakError("Failed to write descriptor {}: {}".format(dUUID, error))
logger.debug("Write Descriptor Value")
self._descriptor_write_log[dUUID] = True