Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
sortedchannels = sorted(enumerate(channels), key=lambda i: i[1])
for original_index, new_ch in sortedchannels:
# skip channels with channel dependencies from the numpy record
if new_ch['ch_depend_addr']:
continue
start_offset = new_ch['start_offset']
bit_offset = start_offset % 8
data_type = new_ch['data_type']
bit_count = new_ch['bit_count']
if memory == 'minimum':
channel_texts = grp['texts']['channels'][original_index]
if channel_texts and 'long_name_addr' in channel_texts:
address = grp['texts']['channels'][original_index]['long_name_addr']
block = TextBlock(
address=address,
stream=stream,
)
name = block['text'].decode('latin-1').strip(' \r\n\t\0')
else:
name = new_ch['short_name'].decode('latin-1').strip(' \r\n\t\0')
name = name.split('\\')[0]
else:
name = new_ch.name
# handle multiple occurance of same channel name
name = get_unique_name(group_channels, name)
group_channels.add(name)
if start_offset >= next_byte_aligned_position:
parent_start_offset = (start_offset // 8) * 8
created
asammdf
{}
'''.format(__version__))
else:
text = '{}\n{}: updated by asammdf {}'
old_history = self.file_history['text'].decode('latin-1')
timestamp = time.asctime().encode('latin-1')
text = text.format(
old_history,
timestamp,
__version__,
)
self.file_history = TextBlock(text=text)
if self.name is None and dst == '':
message = ('Must specify a destination file name '
'for MDF created from scratch')
raise MdfException(message)
dst = dst if dst else self.name
if overwrite is False:
if os.path.isfile(dst):
cntr = 0
while True:
name = os.path.splitext(dst)[0] + '_{}.mdf'.format(cntr)
if not os.path.isfile(name):
break
else:
cntr += 1
for gp in self.groups:
gp_texts = deepcopy(gp['texts'])
if gp['data_location'] == v3c.LOCATION_ORIGINAL_FILE:
stream = self._file
else:
stream = self._tempfile
# Texts
for item_list in gp_texts.values():
for my_dict in item_list:
if my_dict is None:
continue
for key, tx_block in my_dict.items():
# text blocks can be shared
block = TextBlock(
address=tx_block,
stream=stream,
)
text = block['text']
if text in defined_texts:
my_dict[key] = defined_texts[text]
else:
address = tell()
defined_texts[text] = address
my_dict[key] = address
write(bytes(block))
# ChannelConversions
cc = gp['temp_channel_conversions'] = []
for i, conv in enumerate(gp['channel_conversions']):
if not conv:
created
asammdf
{}
'''.format(__version__))
else:
text = '{}\n{}: updated by asammdf {}'
old_history = self.file_history['text'].decode('latin-1')
timestamp = time.asctime().encode('latin-1')
text = text.format(
old_history,
timestamp,
__version__,
)
self.file_history = TextBlock(text=text)
# all MDF blocks are appended to the blocks list in the order in which
# they will be written to disk. While creating this list, all the
# relevant block links are updated so that once all blocks have been
# added to the list they can be written using the bytes protocol.
# DataGroup blocks are written first after the identification and
# header blocks. When memory=False we need to restore the
# original data block addresses within the data group block. This is
# needed to allow further work with the object after the save method
# call (eq. new calls to get method). Since the data group blocks are
# written first, it is safe to restor the original links when the data
# blocks are written. For memory=False the blocks list will
# contain a tuple instead of a DataBlock instance; the tuple will have
# the reference to the data group object and the original link to the
# data block in the soource MDF file.
dg_addr = self.header['first_dg_addr']
# read each data group sequentially
while dg_addr:
gp = DataGroup(address=dg_addr, stream=stream)
record_id_nr = gp['record_id_nr']
cg_nr = gp['cg_nr']
cg_addr = gp['first_cg_addr']
data_addr = gp['data_block_addr']
# read trigger information if available
trigger_addr = gp['trigger_addr']
if trigger_addr:
trigger = TriggerBlock(address=trigger_addr,
stream=stream)
if trigger['text_addr']:
trigger_text = TextBlock(
address=trigger['text_addr'],
stream=stream,
)
else:
trigger_text = None
else:
trigger = None
trigger_text = None
new_groups = []
for i in range(cg_nr):
new_groups.append({})
grp = new_groups[-1]
grp['channels'] = []
grp['channel_conversions'] = []
if memory != 'minimum':
grp_conv.append(None)
else:
grp_conv.append(0)
vtab_texts = {}
if new_conv:
conv_type = new_conv['conversion_type']
else:
conv_type = 0
if conv_type == v3c.CONVERSION_TYPE_VTABR:
for idx in range(new_conv['ref_param_nr']):
address = new_conv['text_{}'.format(idx)]
if address:
if memory != 'minimum':
block = TextBlock(
address=address,
stream=stream,
)
vtab_texts['text_{}'.format(idx)] = block
else:
vtab_texts['text_{}'.format(idx)] = address
if vtab_texts:
grp['texts']['conversion_tab'].append(vtab_texts)
else:
grp['texts']['conversion_tab'].append(None)
address = new_ch['source_depend_addr']
if memory != 'minimum':
if address:
block = ChannelExtension(
grp['channel_extensions'].append(block)
else:
grp['channel_extensions'].append(None)
else:
grp['channel_extensions'].append(address)
# read text fields for channel
ch_texts = {}
for key in (
'long_name_addr',
'comment_addr',
'display_name_addr'):
address = new_ch[key]
if address:
if memory != 'minimum':
ch_texts[key] = TextBlock(
address=address,
stream=stream,
)
else:
ch_texts[key] = address
if ch_texts:
grp_ch_texts.append(ch_texts)
else:
grp_ch_texts.append(None)
# update channel object name and block_size attributes
if new_ch['long_name_addr']:
if memory != 'minimum':
name = ch_texts['long_name_addr']['text']
else:
read = stream.read
seek = stream.seek
dg_cntr = 0
seek(0, v3c.SEEK_START)
self.identification = FileIdentificationBlock(
stream=stream,
)
self.header = HeaderBlock(stream=stream)
self.version = self.identification['version_str']\
.decode('latin-1')\
.strip(' \n\t\0')
self.file_history = TextBlock(
address=self.header['comment_addr'],
stream=stream,
)
# this will hold mapping from channel address to Channel object
# needed for linking dependecy blocks to refernced channels after
# the file is loaded
ch_map = {}
# go to first date group
dg_addr = self.header['first_dg_addr']
# read each data group sequentially
while dg_addr:
gp = DataGroup(address=dg_addr, stream=stream)
record_id_nr = gp['record_id_nr']
cg_nr = gp['cg_nr']