Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
LOG.info("Loading {} from Fermentrack".format(this_tilt))
tilts[this_tilt].load_obj_from_fermentrack()
event_loop = asyncio.get_event_loop()
# First create and configure a raw socket
try:
mysocket = aiobs.create_bt_socket(mydev)
except OSError as e:
LOG.error("Unable to create socket - {}".format(e))
exit(1)
# create a connection with the raw socket (Uses _create_connection_transport instead of create_connection as this now
# requires a STREAM socket) - previously was fac=event_loop.create_connection(aiobs.BLEScanRequester,sock=mysocket)
fac = event_loop._create_connection_transport(mysocket, aiobs.BLEScanRequester, None, None)
conn, btctrl = event_loop.run_until_complete(fac) # Start the bluetooth control loop
btctrl.process=processBLEBeacon # Attach the handler to the bluetooth control loop
# Begin probing
btctrl.send_scan_request()
try:
event_loop.run_forever()
except KeyboardInterrupt:
if verbose:
LOG.info('Keyboard interrupt')
finally:
if verbose:
LOG.info('Closing event loop')
btctrl.stop_scan_request()
command = aiobs.HCI_Cmd_LE_Advertise(enable=False)
btctrl.send_command(command)
def run(self):
"""Run HCIdump thread."""
_LOGGER.debug("HCIdump thread: Run")
try:
mysocket = aiobs.create_bt_socket(self._interface)
except OSError as error:
_LOGGER.error("HCIdump thread: OS error: %s", error)
else:
self._event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._event_loop)
fac = self._event_loop._create_connection_transport(
mysocket, aiobs.BLEScanRequester, None, None
)
_LOGGER.debug("HCIdump thread: Connection")
conn, btctrl = self._event_loop.run_until_complete(fac)
_LOGGER.debug("HCIdump thread: Connected")
btctrl.process = self.process_hci_events
btctrl.send_command(
aiobs.HCI_Cmd_LE_Set_Scan_Params(scan_type=self._active)
)
btctrl.send_scan_request()
_LOGGER.debug("HCIdump thread: start main event_loop")
try:
self._event_loop.run_forever()
finally:
_LOGGER.debug("HCIdump thread: main event_loop stopped, finishing")
btctrl.stop_scan_request()
conn.close()
"""
Provide a callback for 'on_data'. The callback will be run whenever
an Eddystone packet is detected.
:param on_data: A function to be called on Eddystone packet
:return: None
"""
global packet_callback
mydev = 0
event_loop = asyncio.get_event_loop()
# First create and configure a raw socket
mysocket = aioblescan.create_bt_socket(mydev)
# create a connection with the raw socket
fac = event_loop.create_connection(aioblescan.BLEScanRequester,
sock=mysocket)
# Start it
conn, btctrl = event_loop.run_until_complete(fac)
# Attach your processing
btctrl.process = _process_packet
packet_callback = on_data
# Probe
btctrl.send_scan_request()
try:
# event_loop.run_until_complete(coro)
event_loop.run_forever()
except KeyboardInterrupt:
print('keyboard interrupt')
finally:
print('closing event loop')
xx=BlueMaestro().decode(ev)
if xx:
print("Pebble info {}".format(xx))
else:
ev.show(0)
event_loop = asyncio.get_event_loop()
#First create and configure a raw socket
mysocket = aiobs.create_bt_socket(opts.device)
#create a connection with the raw socket
#This used to work but now requires a STREAM socket.
#fac=event_loop.create_connection(aiobs.BLEScanRequester,sock=mysocket)
#Thanks to martensjacobs for this fix
fac=event_loop._create_connection_transport(mysocket,aiobs.BLEScanRequester,None,None)
#Start it
conn,btctrl = event_loop.run_until_complete(fac)
#Attach your processing
btctrl.process=my_process
if opts.advertise:
command = aiobs.HCI_Cmd_LE_Advertise(enable=False)
btctrl.send_command(command)
command = aiobs.HCI_Cmd_LE_Set_Advertised_Params(interval_min=opts.advertise,interval_max=opts.advertise)
btctrl.send_command(command)
if opts.url:
myeddy = EddyStone(param=opts.url)
else:
myeddy = EddyStone()
if opts.txpower:
myeddy.power=opts.txpower
command = aiobs.HCI_Cmd_LE_Set_Advertised_Msg(msg=myeddy)