Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
sortedchannels = sorted(enumerate(channels), key=lambda i: i[1])
for original_index, new_ch in sortedchannels:
# skip channels with channel dependencies from the numpy record
if new_ch['ch_depend_addr']:
continue
start_offset = new_ch['start_offset']
bit_offset = start_offset % 8
data_type = new_ch['data_type']
bit_count = new_ch['bit_count']
if memory == 'minimum':
channel_texts = grp['texts']['channels'][original_index]
if channel_texts and 'long_name_addr' in channel_texts:
address = grp['texts']['channels'][original_index]['long_name_addr']
block = TextBlock(
address=address,
stream=stream,
)
name = block['text'].decode('latin-1').strip(' \r\n\t\0')
else:
name = new_ch['short_name'].decode('latin-1').strip(' \r\n\t\0')
name = name.split('\\')[0]
else:
name = new_ch.name
# handle multiple occurance of same channel name
name = get_unique_name(group_channels, name)
group_channels.add(name)
if start_offset >= next_byte_aligned_position:
parent_start_offset = (start_offset // 8) * 8
if channel_texts and 'long_name_addr' in channel_texts:
address = grp['texts']['channels'][original_index]['long_name_addr']
block = TextBlock(
address=address,
stream=stream,
)
name = block['text'].decode('latin-1').strip(' \r\n\t\0')
else:
name = new_ch['short_name'].decode('latin-1').strip(' \r\n\t\0')
name = name.split('\\')[0]
else:
name = new_ch.name
# handle multiple occurance of same channel name
name = get_unique_name(group_channels, name)
group_channels.add(name)
if start_offset >= next_byte_aligned_position:
parent_start_offset = (start_offset // 8) * 8
# check if there are byte gaps in the record
gap = (parent_start_offset - next_byte_aligned_position) // 8
if gap:
types.append(('', 'a{}'.format(gap)))
# adjust size to 1, 2, 4 or 8 bytes for nonstandard integers
size = bit_offset + bit_count
if data_type == v3c.DATA_TYPE_STRING:
next_byte_aligned_position = parent_start_offset + size
size = size // 8
if next_byte_aligned_position <= record_size:
if channel_texts and 'long_name_addr' in channel_texts:
address = grp['texts']['channels'][original_index]['long_name_addr']
block = TextBlock(
address=address,
stream=stream,
)
name = block['text'].decode('latin-1').strip(' \r\n\t\0')
else:
name = new_ch['short_name'].decode('latin-1').strip(' \r\n\t\0')
name = name.split('\\')[0]
else:
name = new_ch.name
# handle multiple occurance of same channel name
name = get_unique_name(group_channels, name)
group_channels.add(name)
if start_offset >= next_byte_aligned_position:
parent_start_offset = (start_offset // 8) * 8
# check if there are byte gaps in the record
gap = (parent_start_offset - next_byte_aligned_position) // 8
if gap:
types.append(('', 'a{}'.format(gap)))
# adjust size to 1, 2, 4 or 8 bytes for nonstandard integers
size = bit_offset + bit_count
if data_type == v2c.DATA_TYPE_STRING:
next_byte_aligned_position = parent_start_offset + size
size = size // 8
if next_byte_aligned_position <= record_size:
elif conversion_type == v4c.CONVERSION_TYPE_ALG:
block = grp['texts']['conversions'][ch_nr]['formula_addr']
formula = block['text'].decode('utf-8').strip(' \n\t\0')
X = vals
vals = evaluate(formula)
elif conversion_type in (v4c.CONVERSION_TYPE_TABI,
v4c.CONVERSION_TYPE_TAB):
nr = conversion['val_param_nr'] // 2
raw = array(
[conversion['raw_{}'.format(i)] for i in range(nr)]
)
phys = array(
[conversion['phys_{}'.format(i)] for i in range(nr)]
)
if conversion_type == v4c.CONVERSION_TYPE_TABI:
vals = interp(vals, raw, phys)
else:
idx = searchsorted(raw, vals)
idx = clip(idx, 0, len(raw) - 1)
vals = phys[idx]
elif conversion_type == v4c.CONVERSION_TYPE_RTAB:
nr = (conversion['val_param_nr'] - 1) // 3
lower = array(
[conversion['lower_{}'.format(i)] for i in range(nr)]
)
upper = array(
[conversion['upper_{}'.format(i)] for i in range(nr)]
)
phys = array(
[conversion['phys_{}'.format(i)] for i in range(nr)]
v4c.CONVERSION_TYPE_TAB):
nr = conversion['val_param_nr'] // 2
raw = array(
[conversion['raw_{}'.format(i)] for i in range(nr)]
)
phys = array(
[conversion['phys_{}'.format(i)] for i in range(nr)]
)
if conversion_type == v4c.CONVERSION_TYPE_TABI:
vals = interp(vals, raw, phys)
else:
idx = searchsorted(raw, vals)
idx = clip(idx, 0, len(raw) - 1)
vals = phys[idx]
elif conversion_type == v4c.CONVERSION_TYPE_RTAB:
nr = (conversion['val_param_nr'] - 1) // 3
lower = array(
[conversion['lower_{}'.format(i)] for i in range(nr)]
)
upper = array(
[conversion['upper_{}'.format(i)] for i in range(nr)]
)
phys = array(
[conversion['phys_{}'.format(i)] for i in range(nr)]
)
default = conversion['default']
# INT channel
if channel['data_type'] <= 3:
res = []
[conversion['val_{}'.format(i)] for i in range(nr)]
)
phys = array(
[grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)]['text']
for i in range(nr)]
)
default = grp['texts']['conversion_tab'][ch_nr]\
.get('default_addr', {})\
.get('text', b'')
info = {
'raw': raw,
'phys': phys,
'default': default,
}
elif conversion_type == v4c.CONVERSION_TYPE_RTABX:
nr = conversion['val_param_nr'] // 2
phys = array(
[grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)]['text']
for i in range(nr)]
)
lower = array(
[conversion['lower_{}'.format(i)] for i in range(nr)]
)
upper = array(
[conversion['upper_{}'.format(i)] for i in range(nr)]
)
default = grp['texts']['conversion_tab'][ch_nr]\
.get('default_addr', {})\
.get('text', b'')
info = {
def _load_data(self, group, record_offset=0, record_count=None, optimize_read=True):
""" get group's data block bytes"""
has_yielded = False
offset = 0
_count = record_count
channel_group = group.channel_group
if group.data_location == v23c.LOCATION_ORIGINAL_FILE:
# go to the first data block of the current data group
stream = self._file
else:
stream = self._tempfile
record_offset *= channel_group.samples_byte_nr
# go to the first data block of the current data group
if group.sorted:
samples_size = channel_group.samples_byte_nr
if not samples_size:
yield b"", 0, _count
has_yielded = True
else:
if self._read_fragment_size:
split_size = self._read_fragment_size // samples_size
def _get_not_byte_aligned_data(self, data, group, ch_nr):
big_endian_types = (
v23c.DATA_TYPE_UNSIGNED_MOTOROLA,
v23c.DATA_TYPE_FLOAT_MOTOROLA,
v23c.DATA_TYPE_DOUBLE_MOTOROLA,
v23c.DATA_TYPE_SIGNED_MOTOROLA,
)
record_size = group.channel_group.samples_byte_nr
channel = group.channels[ch_nr]
bit_offset = channel.start_offset % 8
byte_offset = channel.start_offset // 8
bit_count = channel.bit_count
byte_count = bit_offset + bit_count
if byte_count % 8:
byte_count = (byte_count // 8) + 1
else:
byte_count //= 8
ch_cntr += 1
# simple channels don't have channel dependencies
gp_dep.append(None)
# channel group
kargs = {
"cycles_nr": cycles_nr,
"samples_byte_nr": offset // 8,
"ch_nr": ch_cntr,
}
if self.version >= "3.30":
kargs["block_len"] = v23c.CG_POST_330_BLOCK_SIZE
else:
kargs["block_len"] = v23c.CG_PRE_330_BLOCK_SIZE
gp.channel_group = ChannelGroup(**kargs)
gp.channel_group.comment = source_info
# data group
if self.version >= "3.20":
block_len = v23c.DG_POST_320_BLOCK_SIZE
else:
block_len = v23c.DG_PRE_320_BLOCK_SIZE
gp.data_group = DataGroup(block_len=block_len)
# data block
types = dtype(types)
gp.types = types
gp.parents = parents
gp.sorted = True
name = new_ch.name
# handle multiple occurance of same channel name
name = group_channels.get_unique_name(name)
if start_offset >= next_byte_aligned_position:
parent_start_offset = (start_offset // 8) * 8
# check if there are byte gaps in the record
gap = (parent_start_offset - next_byte_aligned_position) // 8
if gap:
types.append(("", f"V{gap}"))
# adjust size to 1, 2, 4 or 8 bytes for nonstandard integers
size = bit_offset + bit_count
if data_type == v23c.DATA_TYPE_STRING:
next_byte_aligned_position = parent_start_offset + size
if next_byte_aligned_position <= record_size:
dtype_pair = (name, get_fmt_v3(data_type, size))
types.append(dtype_pair)
parents[original_index] = name, bit_offset
else:
next_byte_aligned_position = parent_start_offset
elif data_type == v23c.DATA_TYPE_BYTEARRAY:
next_byte_aligned_position = parent_start_offset + size
if next_byte_aligned_position <= record_size:
dtype_pair = (name, get_fmt_v3(data_type, size))
types.append(dtype_pair)
parents[original_index] = name, bit_offset
else: