How to use the aioblescan.plugins.EddyStone function in aioblescan

To help you get started, we’ve selected a few aioblescan examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github frawau / aioblescan / tests / test_eddystone.py View on Github external
def test_eddystone_uid(self):
        pckt = aioblescan.HCI_Event()
        pckt.decode(
            b'\x04>)\x02\x01\x03\x01\xdc)e\x90U\xf1\x1d\x02\x01\x06\x03\x03\xaa\xfe\x15\x16\xaa\xfe\x00\xf6\x00\x00\x00\x00\x00\x00\x00\x00\x00c\x00\x00\x00\x00\x00X\xb6')
        result = EddyStone().decode(pckt)
        self.assertDictEqual(result, {'tx_power': -10,
                                      'rssi': -74,
                                      'name space': (0x63).to_bytes(10,byteorder="big"),
                                      'instance': (0x58).to_bytes(6,byteorder="big"),
                                      'mac address': 'f1:55:90:65:29:dc'})
github frawau / aioblescan / tests / test_eddystone.py View on Github external
def test_eddystone_url(self):
        pckt = aioblescan.HCI_Event()
        pckt.decode(
            b'\x04>)\x02\x01\x03\x01\xdc)e\x90U\xf1\x1d\x02\x01\x06\x03\x03\xaa\xfe\x15\x16\xaa\xfe\x10\xf6\x03makecode\x00#about\xb5')
        result = EddyStone().decode(pckt)
        self.assertDictEqual(result, {'mac address': 'f1:55:90:65:29:dc',
                                      'tx_power': -10,
                                      'url': 'https://makecode.com/#about',
                                      'rssi': -75})
github frawau / aioblescan / aioblescan / __main__.py View on Github external
#create a connection with the raw socket
#This used to work but now requires a STREAM socket.
#fac=event_loop.create_connection(aiobs.BLEScanRequester,sock=mysocket)
#Thanks to martensjacobs for this fix
fac=event_loop._create_connection_transport(mysocket,aiobs.BLEScanRequester,None,None)
#Start it
conn,btctrl = event_loop.run_until_complete(fac)
#Attach your processing
btctrl.process=my_process
if opts.advertise:
    command = aiobs.HCI_Cmd_LE_Advertise(enable=False)
    btctrl.send_command(command)
    command = aiobs.HCI_Cmd_LE_Set_Advertised_Params(interval_min=opts.advertise,interval_max=opts.advertise)
    btctrl.send_command(command)
    if opts.url:
        myeddy = EddyStone(param=opts.url)
    else:
        myeddy = EddyStone()
    if opts.txpower:
        myeddy.power=opts.txpower
    command = aiobs.HCI_Cmd_LE_Set_Advertised_Msg(msg=myeddy)
    btctrl.send_command(command)
    command = aiobs.HCI_Cmd_LE_Advertise(enable=True)
    btctrl.send_command(command)

#Probe
btctrl.send_scan_request()
try:
    # event_loop.run_until_complete(coro)
    event_loop.run_forever()
except KeyboardInterrupt:
    print('keyboard interrupt')
github frawau / aioblescan / aioblescan / plugins / ruuviweather.py View on Github external
def decode(self,packet):
        #Look for Ruuvi tag URL and decode it
        result={}
        rssi=packet.retrieve("rssi")
        if rssi:
            result["rssi"]=rssi[-1].val
        url=EddyStone().decode(packet)
        if url is None:
            url=packet.retrieve("Payload for mfg_specific_data")
            if url:
                val=url[0].val
                if val[0]==0x99 and val[1]==0x04 and val[2]==0x03:
                    #Looks just right
                    result["mac address"]=packet.retrieve("peer")[0].val
                    val=val[2:]
                    result["humidity"]=val[1]/2.0
                    result["temperature"]=get_temp(val[2], val[3])
                    result["pressure"]=int.from_bytes(val[4:6],"big")+50000
                    dx=int.from_bytes(val[6:8],"big",signed=True)
                    dy=int.from_bytes(val[8:10],"big",signed=True)
                    dz=int.from_bytes(val[10:12],"big",signed=True)
                    length=sqrt(dx**2 + dy**2 + dz**2)
                    result["accelerometer"]=(dx,dy,dz,length)
github frawau / aioblescan / aioblescan / __main__.py View on Github external
ev=aiobs.HCI_Event()
    xx=ev.decode(data)
    if opts.mac:
        goon = False
        mac= ev.retrieve("peer")
        for x in mac:
            if x.val in opts.mac:
                goon=True
                break
        if not goon:
            return

    if opts.raw:
        print("Raw data: {}".format(ev.raw_data))
    if opts.eddy:
        xx=EddyStone().decode(ev)
        if xx:
            print("Google Beacon {}".format(xx))
    elif opts.ruuvi:
        xx=RuuviWeather().decode(ev)
        if xx:
            print("Weather info {}".format(xx))
    elif opts.pebble:
        xx=BlueMaestro().decode(ev)
        if xx:
            print("Pebble info {}".format(xx))
    else:
        ev.show(0)
github ukBaz / python-bluezero / bluezero / observer.py View on Github external
def _process_packet(data):
    """
    Filter BLE packets to only show Eddystone data
    before calling packet_callback
    :param data: BLE Packet data
    :return: None
    """
    logging.debug('Packet found')
    ev = aioblescan.HCI_Event()
    ev.decode(data)
    logging.debug('Using the following callback %s', packet_callback)
    if packet_callback is not None:
        eddystone_data = EddyStone().decode(ev)
        if eddystone_data:
            packet_callback(eddystone_data)
github frawau / aioblescan / aioblescan / __main__.py View on Github external
#fac=event_loop.create_connection(aiobs.BLEScanRequester,sock=mysocket)
#Thanks to martensjacobs for this fix
fac=event_loop._create_connection_transport(mysocket,aiobs.BLEScanRequester,None,None)
#Start it
conn,btctrl = event_loop.run_until_complete(fac)
#Attach your processing
btctrl.process=my_process
if opts.advertise:
    command = aiobs.HCI_Cmd_LE_Advertise(enable=False)
    btctrl.send_command(command)
    command = aiobs.HCI_Cmd_LE_Set_Advertised_Params(interval_min=opts.advertise,interval_max=opts.advertise)
    btctrl.send_command(command)
    if opts.url:
        myeddy = EddyStone(param=opts.url)
    else:
        myeddy = EddyStone()
    if opts.txpower:
        myeddy.power=opts.txpower
    command = aiobs.HCI_Cmd_LE_Set_Advertised_Msg(msg=myeddy)
    btctrl.send_command(command)
    command = aiobs.HCI_Cmd_LE_Advertise(enable=True)
    btctrl.send_command(command)

#Probe
btctrl.send_scan_request()
try:
    # event_loop.run_until_complete(coro)
    event_loop.run_forever()
except KeyboardInterrupt:
    print('keyboard interrupt')
finally:
    print('closing event loop')