Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.name = name
self.payload = [UShortInt("Manufacturer ID", endian="little"), Itself("Payload")]
def decode(self, data):
# Warning: Will consume all the data you give it!
for p in self.payload:
data = p.decode(data)
return data
def show(self, depth=0):
print("{}{}:".format(PRINT_INDENT*depth,self.name))
for x in self.payload:
x.show(depth+1)
class RepeatedField(Packet):
def __init__(self, name, subfield_cls, length_field_cls=UIntByte):
self.name = name
self.subfield_cls = subfield_cls
self.length_field = length_field_cls("count of " + name)
self.payload = []
def decode(self, data):
self.payload = []
data = self.length_field.decode(data)
for x in range(self.length_field.val):
field = self.subfield_cls()
data = field.decode(data)
self.payload.append(field)
return data
datalength -= length.val + len(code)
data=data[length.val-len(code):]
if data:
myinfo=IntByte("rssi")
data=myinfo.decode(data)
self.payload.append(myinfo)
return data
def show(self,depth=0):
print("{}{}:".format(PRINT_INDENT*depth,self.name))
for x in self.payload:
x.show(depth+1)
class EIR_Hdr(Packet):
def __init__(self):
self.type= EnumByte("type", 0, {
0x01: "flags",
0x02: "incomplete_list_16_bit_svc_uuids",
0x03: "complete_list_16_bit_svc_uuids",
0x04: "incomplete_list_32_bit_svc_uuids",
0x05: "complete_list_32_bit_svc_uuids",
0x06: "incomplete_list_128_bit_svc_uuids",
0x07: "complete_list_128_bit_svc_uuids",
0x08: "shortened_local_name",
0x09: "complete_local_name",
0x0a: "tx_power_level",
0x0d: "class_of_device",
0x0e: "simple_pairing_hash",
0x0f: "simple_pairing_rand",
0x10: "sec_mgr_tk",
ev = HCI_LE_Meta_Event()
data=ev.decode(data)
self.payload.append(ev)
else:
ev=Itself("Payload")
data=ev.decode(data)
self.payload.append(ev)
return data
def show(self,depth=0):
print("{}HCI Event:".format(PRINT_INDENT*depth))
for x in self.payload:
x.show(depth+1)
class HCI_CC_Event(Packet):
"""Command Complete event"""
def __init__(self):
self.name="Command Completed"
self.payload=[UIntByte("allow pkt"),OgfOcf("cmd"),Itself("resp code")]
def decode(self,data):
for x in self.payload:
data=x.decode(data)
return data
def show(self,depth=0):
for x in self.payload:
x.show(depth+1)
class HCI_LE_Meta_Event(Packet):
def show(self):
return self.type.show()
@property
def val(self):
return self.type.val
@property
def strval(self):
return self.type.strval
def __len__(self):
return len(self.type)
class Adv_Data(Packet):
def __init__(self,name,length):
self.name=name
self.length=length
self.payload=[]
def decode(self,data):
myinfo=NBytes("Service Data uuid",self.length)
data=myinfo.decode(data)
self.payload.append(myinfo)
if data:
myinfo=Itself("Adv Payload")
data=myinfo.decode(data)
self.payload.append(myinfo)
return data
def show(self,depth=0):
if isinstance(aclass,str):
if x.name == aclass:
resu.append(x)
else:
if isinstance(x,aclass):
resu.append(x)
resu+=x.retrieve(aclass)
except:
pass
return resu
#
# Commands
#
class HCI_Command(Packet):
"""Class representing a command HCI packet.
:param ogf: the Op-code Group (6 bits).
:type ogf: bytes
:param ocf: the Op-code Command (10 bits).
:type ocf: bytes
:returns: HCI_Command instance.
:rtype: HCI_Command
"""
def __init__(self,ogf,ocf):
super().__init__(HCI_COMMAND)
self.cmd = OgfOcf("command",ogf,ocf)
self.payload = []
self.payload = []
data = self.length_field.decode(data)
for x in range(self.length_field.val):
field = self.subfield_cls()
data = field.decode(data)
self.payload.append(field)
return data
def show(self, depth=0):
print("{}{}: {}".format(PRINT_INDENT*depth, self.name, self.length_field.val))
for field in self.payload:
field.show(depth+1)
class HCI_LEM_Adv_Report(Packet):
def __init__(self):
self.name="Adv Report"
self.payload=[EnumByte("ev type",0,{0:"generic adv", 3:"no connection adv", 4:"scan rsp"}),
EnumByte("addr type",0,{0:"public", 1:"random"}),
MACAddr("peer"),UIntByte("length")]
def decode(self,data):
for x in self.payload:
data=x.decode(data)
#Now we have a sequence of len, type data with possibly a RSSI byte at the end
datalength = self.payload[-1].val
while datalength > 0:
length=UIntByte("sublen")
if code.val==b"\x02":
ev=RepeatedField("Reports", HCI_LEM_Adv_Report)
data=ev.decode(data)
self.payload.append(ev)
else:
ev=Itself("Payload")
data=ev.decode(data)
self.payload.append(ev)
return data
def show(self,depth=0):
print("{}{}:".format(PRINT_INDENT*depth,self.name))
for x in self.payload:
x.show(depth+1)
class ManufacturerSpecificData(Packet):
def __init__(self, name="Manufacturer Specific Data"):
self.name = name
self.payload = [UShortInt("Manufacturer ID", endian="little"), Itself("Payload")]
def decode(self, data):
# Warning: Will consume all the data you give it!
for p in self.payload:
data = p.decode(data)
return data
def show(self, depth=0):
print("{}{}:".format(PRINT_INDENT*depth,self.name))
for x in self.payload:
x.show(depth+1)
:returns: HCI_Cmd_Reset instance.
:rtype: HCI_Cmd_Reset
"""
def __init__(self):
super(self.__class__, self).__init__(b"\x03",b"\x03")
####
# HCI EVents
####
class HCI_Event(Packet):
def __init__(self,code=0,payload=[]):
super().__init__(HCI_EVENT)
self.payload.append(Byte("code"))
self.payload.append(UIntByte("length"))
def decode(self,data):
data=super().decode(data)
if data is None:
return None
for x in self.payload:
x.decode(data[:len(x)])
data=data[len(x):]
code=self.payload[0]
length=self.payload[1].val
"""Command Complete event"""
def __init__(self):
self.name="Command Completed"
self.payload=[UIntByte("allow pkt"),OgfOcf("cmd"),Itself("resp code")]
def decode(self,data):
for x in self.payload:
data=x.decode(data)
return data
def show(self,depth=0):
for x in self.payload:
x.show(depth+1)
class HCI_LE_Meta_Event(Packet):
def __init__(self):
self.name="LE Meta"
self.payload=[Byte("code")]
def decode(self,data):
for x in self.payload:
data=x.decode(data)
code=self.payload[0]
if code.val==b"\x02":
ev=RepeatedField("Reports", HCI_LEM_Adv_Report)
data=ev.decode(data)
self.payload.append(ev)
else:
ev=Itself("Payload")
data=ev.decode(data)
self.payload.append(ev)