Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _find_object(self, index, subindex):
if index not in self.object_dictionary:
# Index does not exist
raise SdoAbortedError(0x06020000)
obj = self.object_dictionary[index]
if not isinstance(obj, objectdictionary.Variable):
# Group or array
if subindex not in obj:
# Subindex does not exist
raise SdoAbortedError(0x06090011)
obj = obj[subindex]
return obj
def _find_object(self, index, subindex):
if index not in self.object_dictionary:
# Index does not exist
raise SdoAbortedError(0x06020000)
obj = self.object_dictionary[index]
if not isinstance(obj, objectdictionary.Variable):
# Group or array
if subindex not in obj:
# Subindex does not exist
raise SdoAbortedError(0x06090011)
obj = obj[subindex]
return obj
subindex = 1
for var in self.map:
logger.info("Writing %s (0x%X:%d, %d bits) to PDO map",
var.name, var.index, var.subindex, var.length)
if hasattr(self.pdo_node.node, "curtis_hack") and self.pdo_node.node.curtis_hack: # Curtis HACK: mixed up field order
self.map_array[subindex].raw = (var.index |
var.subindex << 16 |
var.length << 24)
else:
self.map_array[subindex].raw = (var.index << 16 |
var.subindex << 8 |
var.length)
subindex += 1
try:
self.map_array[0].raw = len(self.map)
except SdoAbortedError as e:
# WORKAROUND for broken implementations: If the array
# number-of-entries parameter is not writable, we have already
# generated the required number of mappings above.
if e.code != 0x06010002:
# Abort codes other than "Attempt to write a read-only
# object" should still be reported.
raise
self._update_data_size()
if self.enabled:
logger.info("Enabling PDO")
self.com_record[1].raw = self.cob_id
self.pdo_node.network.subscribe(self.cob_id, self.on_message)
def get_data(self, index, subindex, check_readable=False):
obj = self._find_object(index, subindex)
if check_readable and not obj.readable:
raise SdoAbortedError(0x06010001)
# Try callback
for callback in self._read_callbacks:
result = callback(index=index, subindex=subindex, od=obj)
if result is not None:
return obj.encode_raw(result)
# Try stored data
try:
return self.data_store[index][subindex]
except KeyError:
# Try ParameterValue in EDS
if obj.value is not None:
return obj.encode_raw(obj.value)
# Try default value
if obj.default is not None:
return obj.encode_raw(result)
# Try stored data
try:
return self.data_store[index][subindex]
except KeyError:
# Try ParameterValue in EDS
if obj.value is not None:
return obj.encode_raw(obj.value)
# Try default value
if obj.default is not None:
return obj.encode_raw(obj.default)
# Resource not available
logger.info("Resource unavailable for 0x%X:%d", index, subindex)
raise SdoAbortedError(0x060A0023)
def set_data(self, index, subindex, data, check_writable=False):
obj = self._find_object(index, subindex)
if check_writable and not obj.writable:
raise SdoAbortedError(0x06010002)
# Try callbacks
for callback in self._write_callbacks:
callback(index=index, subindex=subindex, od=obj, data=data)
# Store data
self.data_store.setdefault(index, {})
self.data_store[index][subindex] = bytes(data)