Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_eddystone_uid(self):
pckt = aioblescan.HCI_Event()
pckt.decode(
b'\x04>)\x02\x01\x03\x01\xdc)e\x90U\xf1\x1d\x02\x01\x06\x03\x03\xaa\xfe\x15\x16\xaa\xfe\x00\xf6\x00\x00\x00\x00\x00\x00\x00\x00\x00c\x00\x00\x00\x00\x00X\xb6')
result = EddyStone().decode(pckt)
self.assertDictEqual(result, {'tx_power': -10,
'rssi': -74,
'name space': (0x63).to_bytes(10,byteorder="big"),
'instance': (0x58).to_bytes(6,byteorder="big"),
'mac address': 'f1:55:90:65:29:dc'})
def test_eddystone_url(self):
pckt = aioblescan.HCI_Event()
pckt.decode(
b'\x04>)\x02\x01\x03\x01\xdc)e\x90U\xf1\x1d\x02\x01\x06\x03\x03\xaa\xfe\x15\x16\xaa\xfe\x10\xf6\x03makecode\x00#about\xb5')
result = EddyStone().decode(pckt)
self.assertDictEqual(result, {'mac address': 'f1:55:90:65:29:dc',
'tx_power': -10,
'url': 'https://makecode.com/#about',
'rssi': -75})
#create a connection with the raw socket
#This used to work but now requires a STREAM socket.
#fac=event_loop.create_connection(aiobs.BLEScanRequester,sock=mysocket)
#Thanks to martensjacobs for this fix
fac=event_loop._create_connection_transport(mysocket,aiobs.BLEScanRequester,None,None)
#Start it
conn,btctrl = event_loop.run_until_complete(fac)
#Attach your processing
btctrl.process=my_process
if opts.advertise:
command = aiobs.HCI_Cmd_LE_Advertise(enable=False)
btctrl.send_command(command)
command = aiobs.HCI_Cmd_LE_Set_Advertised_Params(interval_min=opts.advertise,interval_max=opts.advertise)
btctrl.send_command(command)
if opts.url:
myeddy = EddyStone(param=opts.url)
else:
myeddy = EddyStone()
if opts.txpower:
myeddy.power=opts.txpower
command = aiobs.HCI_Cmd_LE_Set_Advertised_Msg(msg=myeddy)
btctrl.send_command(command)
command = aiobs.HCI_Cmd_LE_Advertise(enable=True)
btctrl.send_command(command)
#Probe
btctrl.send_scan_request()
try:
# event_loop.run_until_complete(coro)
event_loop.run_forever()
except KeyboardInterrupt:
print('keyboard interrupt')
def decode(self,packet):
#Look for Ruuvi tag URL and decode it
result={}
rssi=packet.retrieve("rssi")
if rssi:
result["rssi"]=rssi[-1].val
url=EddyStone().decode(packet)
if url is None:
url=packet.retrieve("Payload for mfg_specific_data")
if url:
val=url[0].val
if val[0]==0x99 and val[1]==0x04 and val[2]==0x03:
#Looks just right
result["mac address"]=packet.retrieve("peer")[0].val
val=val[2:]
result["humidity"]=val[1]/2.0
result["temperature"]=get_temp(val[2], val[3])
result["pressure"]=int.from_bytes(val[4:6],"big")+50000
dx=int.from_bytes(val[6:8],"big",signed=True)
dy=int.from_bytes(val[8:10],"big",signed=True)
dz=int.from_bytes(val[10:12],"big",signed=True)
length=sqrt(dx**2 + dy**2 + dz**2)
result["accelerometer"]=(dx,dy,dz,length)
ev=aiobs.HCI_Event()
xx=ev.decode(data)
if opts.mac:
goon = False
mac= ev.retrieve("peer")
for x in mac:
if x.val in opts.mac:
goon=True
break
if not goon:
return
if opts.raw:
print("Raw data: {}".format(ev.raw_data))
if opts.eddy:
xx=EddyStone().decode(ev)
if xx:
print("Google Beacon {}".format(xx))
elif opts.ruuvi:
xx=RuuviWeather().decode(ev)
if xx:
print("Weather info {}".format(xx))
elif opts.pebble:
xx=BlueMaestro().decode(ev)
if xx:
print("Pebble info {}".format(xx))
else:
ev.show(0)
def _process_packet(data):
"""
Filter BLE packets to only show Eddystone data
before calling packet_callback
:param data: BLE Packet data
:return: None
"""
logging.debug('Packet found')
ev = aioblescan.HCI_Event()
ev.decode(data)
logging.debug('Using the following callback %s', packet_callback)
if packet_callback is not None:
eddystone_data = EddyStone().decode(ev)
if eddystone_data:
packet_callback(eddystone_data)
#fac=event_loop.create_connection(aiobs.BLEScanRequester,sock=mysocket)
#Thanks to martensjacobs for this fix
fac=event_loop._create_connection_transport(mysocket,aiobs.BLEScanRequester,None,None)
#Start it
conn,btctrl = event_loop.run_until_complete(fac)
#Attach your processing
btctrl.process=my_process
if opts.advertise:
command = aiobs.HCI_Cmd_LE_Advertise(enable=False)
btctrl.send_command(command)
command = aiobs.HCI_Cmd_LE_Set_Advertised_Params(interval_min=opts.advertise,interval_max=opts.advertise)
btctrl.send_command(command)
if opts.url:
myeddy = EddyStone(param=opts.url)
else:
myeddy = EddyStone()
if opts.txpower:
myeddy.power=opts.txpower
command = aiobs.HCI_Cmd_LE_Set_Advertised_Msg(msg=myeddy)
btctrl.send_command(command)
command = aiobs.HCI_Cmd_LE_Advertise(enable=True)
btctrl.send_command(command)
#Probe
btctrl.send_scan_request()
try:
# event_loop.run_until_complete(coro)
event_loop.run_forever()
except KeyboardInterrupt:
print('keyboard interrupt')
finally:
print('closing event loop')