Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_import():
"""Test by importing the client and assert correct client by OS."""
if platform.system() == "Linux":
from bleak import BleakClient
assert BleakClient.__name__ == "BleakClientBlueZDBus"
elif platform.system() == "Windows":
from bleak import BleakClient
assert BleakClient.__name__ == "BleakClientDotNet"
elif platform.system() == "Darwin":
from bleak import BleakClient
assert BleakClient.__name__ == "BleakClientCoreBluetooth"
async def show_disconnect_handling(mac_addr: str, loop: asyncio.AbstractEventLoop):
async with BleakClient(mac_addr, loop=loop) as client:
disconnected_event = asyncio.Event()
def disconnect_callback(client):
print("Disconnected callback called!")
loop.call_soon_threadsafe(disconnected_event.set)
client.set_disconnected_callback(disconnect_callback)
print("Sleeping until device disconnects according to BlueZ...")
await disconnected_event.wait()
print("Connected: {0}".format(await client.is_connected()))
await asyncio.sleep(0.5) # Sleep a bit longer to allow _cleanup to remove all BlueZ notifications nicely...
async def run(address, loop, debug=False):
log = logging.getLogger(__name__)
if debug:
import sys
# loop.set_debug(True)
log.setLevel(logging.DEBUG)
h = logging.StreamHandler(sys.stdout)
h.setLevel(logging.DEBUG)
log.addHandler(h)
async with BleakClient(address, loop=loop) as client:
x = await client.is_connected()
log.info("Connected: {0}".format(x))
for service in client.services:
log.info("[Service] {0}: {1}".format(service.uuid, service.description))
for char in service.characteristics:
if "read" in char.properties:
try:
value = bytes(await client.read_gatt_char(char.uuid))
except Exception as e:
value = str(e).encode()
else:
value = None
log.info(
"\t[Characteristic] {0}: ({1}) | Name: {2}, Value: {3} ".format(
char.uuid, ",".join(char.properties), char.description, value
async def _connect_run(self, address):
self.client = BleakClient(address, loop=self.get_loop())
await self.client.connect()
async def asyncio_loop(self):
# Wait for messages on in_queue
done = False
while not done:
msg = await self.in_queue.get()
if isinstance(msg, tuple):
msg, val = msg
await self.in_queue.task_done()
if msg == 'discover':
print('Awaiting on bleak discover')
devices = await bleak.discover(timeout=1, loop=self.loop)
print('Done Awaiting on bleak discover')
await self.out_queue.put(devices)
elif msg == 'connect':
device = BleakClient(address=val, loop=self.loop)
self.devices.append(device)
await device.connect()
await self.out_queue.put(device)
elif msg == 'tx':
device, char_uuid, msg_bytes = val
await device.write_gatt_char(char_uuid, msg_bytes)
elif msg == 'notify':
device, char_uuid, msg_handler = val
await device.start_notify(char_uuid, msg_handler)
elif msg =='quit':
logging.info('quitting')
for device in self.devices:
await device.disconnect()
done = True
else:
print(f'Unknown message to Bleak: {msg}')
async def run(address, loop, debug=False):
log = logging.getLogger(__name__)
if debug:
import sys
# loop.set_debug(True)
log.setLevel(logging.DEBUG)
h = logging.StreamHandler(sys.stdout)
h.setLevel(logging.DEBUG)
log.addHandler(h)
async with BleakClient(address, loop=loop) as client:
x = await client.is_connected()
log.info("Connected: {0}".format(x))
for service in client.services:
# service.obj is instance of 'Windows.Devices.Bluetooth.GenericAttributeProfile.GattDeviceService'
log.info(
"[Service] {0}: {1}".format(service.uuid, service.description)
)
for char in service.characteristics:
# char.obj is instance of 'Windows.Devices.Bluetooth.GenericAttributeProfile.GattCharacteristic'
if 'read' in char.properties:
value = bytes(await client.read_gatt_char(char.uuid))
else:
value = None
log.info(
"\t[Characteristic] {0}: ({1}) | Name: {2}, Value: {3} ".format(
async def read_data(loop):
config = ConfigParser()
config.read("band.ini")
device_uuid = config[DEVICE_NAME]["device_uuid"]
device_mac = config[DEVICE_NAME]["device_mac"]
async with BleakClient(device_mac if platform.system() != "Darwin" else device_uuid, loop=loop) as client:
for name, uuid in CHARACTERISTICS.items():
characteristic = await client.read_gatt_char(uuid)
if name == "Battery Level":
value, *_ = struct.unpack("B", characteristic)
print(f"Charge (0-100): {value}")
elif name == "Body Sensor Location":
value, *_ = struct.unpack("B", characteristic)
print(f"Location: {BODY_SENSOR_LOCATIONS[value]}")
else:
print(f"{name}: {characteristic}")
async def run(config, loop):
secret = base64.b64decode(config["secret"])
device_uuid = config["device_uuid"]
device_mac = config["device_mac"]
client_mac = config["client_mac"]
async with BleakClient(device_mac if platform.system() != "Darwin" else device_uuid, loop=loop) as client:
band = Band(loop=loop, client=client, client_mac=client_mac, device_mac=device_mac, key=secret)
await band.connect()
await band.handshake()
# await band.factory_reset()
battery_level = await band.get_battery_level()
logger.info(f"Battery level: {battery_level}")
await band.set_right_wrist(False)
await band.set_rotation_actions()
await band.set_time()
await band.set_locale("en-US", locale_config.MeasurementSystem.Metric)
await band.set_date_format(device_config.DateFormat.YearFirst, device_config.TimeFormat.Hours24)
async def run(address, loop, debug=False):
if debug:
import sys
# loop.set_debug(True)
# l = logging.getLogger("asyncio")
# l.setLevel(logging.DEBUG)
# h = logging.StreamHandler(sys.stdout)
# h.setLevel(logging.DEBUG)
# l.addHandler(h)
async with BleakClient(address, loop=loop) as client:
x = await client.is_connected()
logger.info("Connected: {0}".format(x))
system_id = await client.read_gatt_char(SYSTEM_ID_UUID)
print(
"System ID: {0}".format(
":".join(["{:02x}".format(x) for x in system_id[::-1]])
)
)
model_number = await client.read_gatt_char(MODEL_NBR_UUID)
print("Model Number: {0}".format("".join(map(chr, model_number))))
device_name = await client.read_gatt_char(DEVICE_NAME_UUID)
print("Device Name: {0}".format("".join(map(chr, device_name))))
async def run(address, loop, debug=False):
if debug:
import sys
# loop.set_debug(True)
l = logging.getLogger("asyncio")
l.setLevel(logging.DEBUG)
h = logging.StreamHandler(sys.stdout)
h.setLevel(logging.DEBUG)
l.addHandler(h)
logger.addHandler(h)
async with BleakClient(address, loop=loop) as client:
x = await client.is_connected()
logger.info("Connected: {0}".format(x))
await client.start_notify(CHARACTERISTIC_UUID, notification_handler)
await asyncio.sleep(5.0, loop=loop)
await client.stop_notify(CHARACTERISTIC_UUID)