Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if channel_texts and 'long_name_addr' in channel_texts:
address = grp['texts']['channels'][original_index]['long_name_addr']
block = TextBlock(
address=address,
stream=stream,
)
name = block['text'].decode('latin-1').strip(' \r\n\t\0')
else:
name = new_ch['short_name'].decode('latin-1').strip(' \r\n\t\0')
name = name.split('\\')[0]
else:
name = new_ch.name
# handle multiple occurance of same channel name
name = get_unique_name(group_channels, name)
group_channels.add(name)
if start_offset >= next_byte_aligned_position:
parent_start_offset = (start_offset // 8) * 8
# check if there are byte gaps in the record
gap = (parent_start_offset - next_byte_aligned_position) // 8
if gap:
types.append(('', 'a{}'.format(gap)))
# adjust size to 1, 2, 4 or 8 bytes for nonstandard integers
size = bit_offset + bit_count
if data_type == v3c.DATA_TYPE_STRING:
next_byte_aligned_position = parent_start_offset + size
size = size // 8
if next_byte_aligned_position <= record_size:
if channel_texts and 'long_name_addr' in channel_texts:
address = grp['texts']['channels'][original_index]['long_name_addr']
block = TextBlock(
address=address,
stream=stream,
)
name = block['text'].decode('latin-1').strip(' \r\n\t\0')
else:
name = new_ch['short_name'].decode('latin-1').strip(' \r\n\t\0')
name = name.split('\\')[0]
else:
name = new_ch.name
# handle multiple occurance of same channel name
name = get_unique_name(group_channels, name)
group_channels.add(name)
if start_offset >= next_byte_aligned_position:
parent_start_offset = (start_offset // 8) * 8
# check if there are byte gaps in the record
gap = (parent_start_offset - next_byte_aligned_position) // 8
if gap:
types.append(('', 'a{}'.format(gap)))
# adjust size to 1, 2, 4 or 8 bytes for nonstandard integers
size = bit_offset + bit_count
if data_type == v2c.DATA_TYPE_STRING:
next_byte_aligned_position = parent_start_offset + size
size = size // 8
if next_byte_aligned_position <= record_size:
continue
start_offset = new_ch['start_offset']
try:
additional_byte_offset = new_ch['aditional_byte_offset']
start_offset += 8 * additional_byte_offset
except KeyError:
pass
bit_offset = start_offset % 8
data_type = new_ch['data_type']
bit_count = new_ch['bit_count']
name = new_ch.name
# handle multiple occurance of same channel name
name = get_unique_name(group_channels, name)
group_channels.add(name)
if start_offset >= next_byte_aligned_position:
parent_start_offset = (start_offset // 8) * 8
# check if there are byte gaps in the record
gap = (parent_start_offset - next_byte_aligned_position) // 8
if gap:
types.append(('', 'a{}'.format(gap)))
# adjust size to 1, 2, 4 or 8 bytes for nonstandard integers
size = bit_offset + bit_count
if data_type == v23c.DATA_TYPE_STRING:
next_byte_aligned_position = parent_start_offset + size
if next_byte_aligned_position <= record_size:
dtype_pair = (name, get_fmt_v3(data_type, size))
# adjusted to the first higher standard integer size (eq. uint of 28bits will
# be adjusted to 32bits)
sortedchannels = sorted(enumerate(grp['channels']), key=lambda i: i[1])
for original_index, new_ch in sortedchannels:
start_offset = new_ch['byte_offset']
bit_offset = new_ch['bit_offset']
data_type = new_ch['data_type']
bit_count = new_ch['bit_count']
ch_type = new_ch['channel_type']
dependency_list = grp['channel_dependencies'][original_index]
name = new_ch.name
# handle multiple occurance of same channel name
name = get_unique_name(group_channels, name)
group_channels.add(name)
if start_offset >= next_byte_aligned_position:
if ch_type not in (v4c.CHANNEL_TYPE_VIRTUAL_MASTER,
v4c.CHANNEL_TYPE_VIRTUAL):
if not dependency_list:
parent_start_offset = start_offset
# check if there are byte gaps in the record
gap = parent_start_offset - next_byte_aligned_position
if gap:
types.append(('', 'a{}'.format(gap)))
# adjust size to 1, 2, 4 or 8 bytes
size = bit_offset + bit_count
if data_type not in (v4c.DATA_TYPE_BYTEARRAY,
parent_dep = ChannelArrayBlock(**kargs)
gp_dep.append([parent_dep, ])
else:
# add channel dependency block for composed parent channel
kargs = {
'dims': 1,
'ca_type': v4c.CA_TYPE_SCALE_AXIS,
'flags': 0,
'byte_offset_base': samples.dtype.itemsize,
'dim_size_0': shape[0],
}
parent_dep = ChannelArrayBlock(**kargs)
gp_dep.append([parent_dep, ])
field_name = get_unique_name(field_names, name)
field_names.add(field_name)
fields.append(samples)
dtype_pair = field_name, samples.dtype, shape
types.append(dtype_pair)
# first we add the structure channel
# add channel texts
for item in gp['texts'].values():
item.append({})
gp_texts['channels'][-1]['name_addr'] = TextBlock(
text=name,
meta=False,
)
if signal.unit:
gp_texts['channels'][-1]['unit_addr'] = TextBlock(
channel = Channel(**kargs)
channel.name = name
channel.source = source
channel.conversion = conversion
if memory != 'minimum':
new_gp_channels.append(channel)
else:
channel.to_stream(file, defined_texts, cc_map, si_map)
new_gp_channels.append(channel.address)
new_offset += s_size
self.channels_db.add(name, new_dg_cntr, new_ch_cntr)
# update the parents as well
field_name = get_unique_name(new_field_names, name)
new_parents[new_ch_cntr] = field_name, 0
new_fields.append(samples)
new_types.append((field_name, samples.dtype))
new_field_names.add(field_name)
new_ch_cntr += 1
# simple channels don't have channel dependencies
new_gp_dep.append(None)
# channel group
kargs = {
'cycles_nr': cycles_nr,
'samples_byte_nr': new_offset >> 3,
'ch_nr': new_ch_cntr,
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
# update the parents as well
parents[ch_cntr] = name, 0
ch_cntr += 1
dep_list = []
gp_dep.append(dep_list)
# then we add the fields
for name in names:
field_name = get_unique_name(field_names, name)
field_names.add(field_name)
samples = signal.samples[name]
s_type, s_size = fmt_to_datatype(samples.dtype)
byte_size = s_size >> 3
fields.append(samples)
types.append((field_name, samples.dtype))
types.append(vals.dtype)
# add channel texts
for item in gp['texts'].values():
item.append({})
gp_texts['channels'][-1]['name_addr'] = TextBlock(
text=name,
tail = compacted_signals.pop()
size = tail['bit_count']
cluster.append(tail)
while size < dtype_size and compacted_signals:
head = compacted_signals[0]
head_size = head['bit_count']
if head_size + size > dtype_size:
break
else:
cluster.append(compacted_signals.pop(0))
size += head_size
bit_offset = 0
field_name = get_unique_name(field_names, 'COMPACT')
types.append((field_name, dtype_))
field_names.add(field_name)
values = zeros(cycles_nr, dtype=dtype_)
for signal_d in cluster:
signal = signal_d['signal']
bit_count = signal_d['bit_count']
min_val = signal_d['min']
max_val = signal_d['max']
name = signal.name
for _, item in gp['texts'].items():
item.append({})