Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
rows.prepend(row)
break
self.__parse_row(row, rows, types)
self.__complete_dynamic(types)
def __parse_row(self, row, rows, types):
self._add_field(DynamicMessageField(self._log, row, rows, types))
def __complete_dynamic(self, types):
# these may be forward references
for field in self._profile_to_field.values():
if field.is_dynamic:
field._complete_dynamic(self, types)
class Header(Message):
def __init__(self, log, types):
super().__init__(log, 'HEADER', number=HEADER_GLOBAL_TYPE)
for n, (name, size, base_type) in enumerate(HEADER_FIELDS):
self._add_field(MessageField(log, name, n, None, types.profile_to_type(base_type)))
def _parse_field(self, field, data, count, endian, references, message):
if field.name == 'checksum' and references['header_size'] == 12:
return None, None
else:
return super()._parse_field(field, data, count, endian, references, message)
class Missing(Message):
def __init__(self, log, number):
if field.field:
name, value = self._parse_field(
field.field, bytes, field.count, defn.endian, references, self)
else:
name = str(field.number)
value = (field.base_type.parse(bytes, field.count, defn.endian), None)
if name in defn.references:
references[name] = value
yield name, value
def _parse_field(self, field, bytes, count, endian, references, message):
# allow interception for optional field in header
return field.parse(bytes, count, endian, references, message)
class NumberedMessage(Message):
def __init__(self, log, name, types):
try:
number = types.profile_to_type('mesg_num').profile_to_internal(name)
except KeyError:
number = None
log.warn('No mesg_num for %r' % name)
super().__init__(log, name, number)
class RowMessage(NumberedMessage):
def __init__(self, log, row, rows, types):
super().__init__(log, row[0], types)
for row in rows:
if not row[2]:
class Header(Message):
def __init__(self, log, types):
super().__init__(log, 'HEADER', number=HEADER_GLOBAL_TYPE)
for n, (name, size, base_type) in enumerate(HEADER_FIELDS):
self._add_field(MessageField(log, name, n, None, types.profile_to_type(base_type)))
def _parse_field(self, field, data, count, endian, references, message):
if field.name == 'checksum' and references['header_size'] == 12:
return None, None
else:
return super()._parse_field(field, data, count, endian, references, message)
class Missing(Message):
def __init__(self, log, number):
super().__init__(log, 'MESSAGE %d' % number, number)
class Messages:
def __init__(self, log, sheet, types):
self.__log = log
self.__profile_to_message = ErrorDict(log, 'No message for profile %r')
self.__number_to_message = ErrorDict(log, 'No message for number %r')
rows = peekable([cell.value for cell in row] for row in sheet.iter_rows())
for row in rows:
if row[0] and row[0][0].isupper():
self.__log.debug('Skipping %s' % row)
elif row[0]: