Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def set_data(self, data):
"""Set for the given variable the PDO data.
:param bytes data: Value for the PDO variable in the PDO message as :class:`bytes`.
"""
byte_offset, bit_offset = divmod(self.offset, 8)
logger.debug("Updating %s to %s in %s",
self.name, binascii.hexlify(data), self.pdo_parent.name)
if bit_offset or self.length % 8:
cur_msg_data = self.pdo_parent.data[byte_offset:byte_offset + len(self.od) // 8]
# Need information of the current variable type (unsigned vs signed)
data_type = self.od.data_type
if data_type == objectdictionary.BOOLEAN:
# A boolean type needs to be treated as an U08
data_type = objectdictionary.UNSIGNED8
od_struct = self.od.STRUCT_TYPES[data_type]
cur_msg_data = od_struct.unpack(cur_msg_data)[0]
# data has to have the same size as old_data
data = od_struct.unpack(data)[0]
# Mask out the old data value
# At the end we need to mask for correct variable length (bitwise operation failure)
shifted = (((1 << self.length) - 1) << bit_offset) & ((1 << len(self.od)) - 1)
bitwise_not = (~shifted) & ((1 << len(self.od)) - 1)
cur_msg_data = cur_msg_data & bitwise_not
# Set the new data on the correct position
data = (data << bit_offset) | cur_msg_data
data = od_struct.pack_into(self.pdo_parent.data, byte_offset, data)
else:
self.pdo_parent.data[byte_offset:byte_offset + len(data)] = data
self.pdo_parent.update()
def test_subindexes(self):
array = od.Array("Test Array", 0x1000)
last_subindex = od.Variable("Last subindex", 0x1000, 0)
last_subindex.data_type = od.UNSIGNED8
array.add_member(last_subindex)
array.add_member(od.Variable("Test Variable", 0x1000, 1))
array.add_member(od.Variable("Test Variable 2", 0x1000, 2))
self.assertEqual(array[0].name, "Last subindex")
self.assertEqual(array[1].name, "Test Variable")
self.assertEqual(array[2].name, "Test Variable 2")
self.assertEqual(array[3].name, "Test Variable_3")
def test_unsigned8(self):
var = od.Variable("Test UNSIGNED8", 0x1000)
var.data_type = od.UNSIGNED8
self.assertEqual(var.decode_raw(b"\xff"), 255)
self.assertEqual(var.encode_raw(254), b"\xfe")
try:
import xml.etree.cElementTree as etree
except ImportError:
import xml.etree.ElementTree as etree
import logging
from canopen import objectdictionary
logger = logging.getLogger(__name__)
DATA_TYPES = {
"BOOLEAN": objectdictionary.BOOLEAN,
"INTEGER8": objectdictionary.INTEGER8,
"INTEGER16": objectdictionary.INTEGER16,
"INTEGER32": objectdictionary.INTEGER32,
"UNSIGNED8": objectdictionary.UNSIGNED8,
"UNSIGNED16": objectdictionary.UNSIGNED16,
"UNSIGNED32": objectdictionary.UNSIGNED32,
"REAL32": objectdictionary.REAL32,
"VISIBLE_STRING": objectdictionary.VISIBLE_STRING,
"DOMAIN": objectdictionary.DOMAIN
}
def import_epf(epf):
"""Import an EPF file.
:param epf:
Either a path to an EPF-file, a file-like object, or an instance of
:class:`xml.etree.ElementTree.Element`.
:returns:
try:
object_type = int(eds.get(section, "ObjectType"), 0)
except NoOptionError:
# DS306 4.6.3.2 object description
# If the keyword ObjectType is missing, this is regarded as
# "ObjectType=0x7" (=VAR).
object_type = VAR
if object_type in (VAR, DOMAIN):
var = build_variable(eds, section, node_id, index)
od.add_object(var)
elif object_type == ARR and eds.has_option(section, "CompactSubObj"):
arr = objectdictionary.Array(name, index)
last_subindex = objectdictionary.Variable(
"Number of entries", index, 0)
last_subindex.data_type = objectdictionary.UNSIGNED8
arr.add_member(last_subindex)
arr.add_member(build_variable(eds, section, node_id, index, 1))
od.add_object(arr)
elif object_type == ARR:
arr = objectdictionary.Array(name, index)
od.add_object(arr)
elif object_type == RECORD:
record = objectdictionary.Record(name, index)
od.add_object(record)
continue
# Match subindexes
match = re.match(r"^([0-9A-Fa-f]{4})[S|s]ub([0-9A-Fa-f]+)$", section)
if match is not None:
index = int(match.group(1), 16)