Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def set_uint(self, value, nbits, bitpos):
import bitstring
if nbits // NBITS_PER_BYTE == 0:
bins = bitstring.Bits(uint=value, length=nbits)
else:
bins = bitstring.Bits(uintbe=value, length=24)
self.bit_stream[bitpos: bitpos + nbits] = bins
def match_to_bits(self, key, val):
"""convert match fields and masks to bits objects.
this allows for masked matching. Converting all match fields to the
same object simplifies things (eg __str__).
"""
if isinstance(val, Bits):
return val
def _val_to_bits(conv, val, length):
if val is -1:
return Bits(int=-1, length=length)
return Bits(bytes=conv(val), length=length)
if key in self.MAC_MATCH_FIELDS:
return _val_to_bits(addrconv.mac.text_to_bin, val, 48)
if key in self.IPV4_MATCH_FIELDS:
return _val_to_bits(addrconv.ipv4.text_to_bin, val, 32)
if key in self.IPV6_MATCH_FIELDS:
return _val_to_bits(addrconv.ipv6.text_to_bin, val, 128)
return Bits(int=int(val), length=64)
def test_StripCRCRemovesCRCBits(self):
testWord = bitstring.Bits("0x0564B34483000100DF89")
result = DataLinkTranslator.StripCRCBits(testWord)
assert result.uint == bitstring.Bits("0x0564B34483000100").uint , "Did not Grab the right bits, grabbed {}".format(result.hex)
def testLongBool(self):
a = Bits(1000)
b = bool(a)
self.assertTrue(b is False)
def testPadding1(self):
value = Bits(bin='1111111')
expected = Bits(bin='11111110')
uut = self.get_default_encoder()
self.assertEqual(uut.encode(value), expected)
def testCreationFromDataWithOffset(self):
s1 = Bits(bytes=b'\x0b\x1c\x2f', offset=0, length=20)
s2 = Bits(bytes=b'\xa0\xb1\xC2', offset=4)
self.assertEqual((s2.len, s2.hex), (20, '0b1c2'))
self.assertEqual((s1.len, s1.hex), (20, '0b1c2'))
self.assertTrue(s1 == s2)
def test_DataLayerControlSliceGrabsRightBits(self):
testWord = bitstring.Bits("0x0564B34483000100")
result = DataLinkTranslator.DataLayerControl(testWord)
assert result.uint == bitstring.Bits("0x44").uint , "Did not grab right control Octet, grabbed {},\n should be {}".format(result.bin, bitstring.Bits("0x44").bin)
def testCreationFromInt(self):
s = Bits(int=0, length=4)
self.assertEqual(s.bin, '0000')
s = Bits(int=1, length=2)
self.assertEqual(s.bin, '01')
s = Bits(int=-1, length=11)
self.assertEqual(s.bin, '11111111111')
s = Bits(int=12, length=7)
self.assertEqual(s.int, 12)
s = Bits(int=-243, length=108)
self.assertEqual((s.int, s.length), (-243, 108))
for length in range(6, 10):
for value in range(-17, 17):
s = Bits(int=value, length=length)
self.assertEqual((s.int, s.length), (value, length))
s = Bits(int=10, length=8)
appMask = bitstring.Bits("0x000000000000000F")
seqMask = bitstring.Bits("0x000000000000000E")
seqShift = 0
consecutiveMask = bitstring.Bits("0x0000000000000020")
consecutiveShift = 5
unsolicitedMask = bitstring.Bits("0x0000000000000010")
unsolicitedShift = 4
firstMask = bitstring.Bits("0x0000000000000040")
firstShift = 6
finalMask = bitstring.Bits("0x0000000000000080")
finalShift = 7
#function code masks
#81 through 83 (hex) indicates response
funcCodeMask = bitstring.Bits("0x000000000000FF00")
funcCodeShift = 8
#only for response from outstations
lsbMask = bitstring.Bits("0x00000000000F0000")
msbMask = bitstring.Bits("0x0000000000F00000")
lsbShift = 12
msbShift = 16
def DNP3(message):
#bytes are recieved in reverse byte order
print (message)
binMessage = bitstring.BitArray(message)
numMessage = int(message,16)
def from_bytes(cls, bitstream):
'''
Parse the given packet and update properties accordingly
'''
packet = cls()
# Convert to ConstBitStream (if not already provided)
if not isinstance(bitstream, ConstBitStream):
if isinstance(bitstream, Bits):
bitstream = ConstBitStream(auto=bitstream)
else:
bitstream = ConstBitStream(bytes=bitstream)
# Read the type
type_nr = bitstream.read('uint:4')
if type_nr != packet.message_type:
msg = 'Invalid bitstream for a {0} packet'
class_name = packet.__class__.__name__
raise ValueError(msg.format(class_name))
# Read the flags
has_xtr_site_id = bitstream.read('bool')
# Skip reserved bits
packet._reserved1 = bitstream.read(19)