Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
elif conversion_type == v4c.CONVERSION_TYPE_ALG:
block = grp['texts']['conversions'][ch_nr]['formula_addr']
formula = block['text'].decode('utf-8').strip(' \n\t\0')
X = vals
vals = evaluate(formula)
elif conversion_type in (v4c.CONVERSION_TYPE_TABI,
v4c.CONVERSION_TYPE_TAB):
nr = conversion['val_param_nr'] // 2
raw = array(
[conversion['raw_{}'.format(i)] for i in range(nr)]
)
phys = array(
[conversion['phys_{}'.format(i)] for i in range(nr)]
)
if conversion_type == v4c.CONVERSION_TYPE_TABI:
vals = interp(vals, raw, phys)
else:
idx = searchsorted(raw, vals)
idx = clip(idx, 0, len(raw) - 1)
vals = phys[idx]
elif conversion_type == v4c.CONVERSION_TYPE_RTAB:
nr = (conversion['val_param_nr'] - 1) // 3
lower = array(
[conversion['lower_{}'.format(i)] for i in range(nr)]
)
upper = array(
[conversion['upper_{}'.format(i)] for i in range(nr)]
)
phys = array(
[conversion['phys_{}'.format(i)] for i in range(nr)]
v4c.CONVERSION_TYPE_TAB):
nr = conversion['val_param_nr'] // 2
raw = array(
[conversion['raw_{}'.format(i)] for i in range(nr)]
)
phys = array(
[conversion['phys_{}'.format(i)] for i in range(nr)]
)
if conversion_type == v4c.CONVERSION_TYPE_TABI:
vals = interp(vals, raw, phys)
else:
idx = searchsorted(raw, vals)
idx = clip(idx, 0, len(raw) - 1)
vals = phys[idx]
elif conversion_type == v4c.CONVERSION_TYPE_RTAB:
nr = (conversion['val_param_nr'] - 1) // 3
lower = array(
[conversion['lower_{}'.format(i)] for i in range(nr)]
)
upper = array(
[conversion['upper_{}'.format(i)] for i in range(nr)]
)
phys = array(
[conversion['phys_{}'.format(i)] for i in range(nr)]
)
default = conversion['default']
# INT channel
if channel['data_type'] <= 3:
res = []
[conversion['val_{}'.format(i)] for i in range(nr)]
)
phys = array(
[grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)]['text']
for i in range(nr)]
)
default = grp['texts']['conversion_tab'][ch_nr]\
.get('default_addr', {})\
.get('text', b'')
info = {
'raw': raw,
'phys': phys,
'default': default,
}
elif conversion_type == v4c.CONVERSION_TYPE_RTABX:
nr = conversion['val_param_nr'] // 2
phys = array(
[grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)]['text']
for i in range(nr)]
)
lower = array(
[conversion['lower_{}'.format(i)] for i in range(nr)]
)
upper = array(
[conversion['upper_{}'.format(i)] for i in range(nr)]
)
default = grp['texts']['conversion_tab'][ch_nr]\
.get('default_addr', {})\
.get('text', b'')
info = {
def _get_not_byte_aligned_data(self, data, group, ch_nr):
big_endian_types = (
v4c.DATA_TYPE_UNSIGNED_MOTOROLA,
v4c.DATA_TYPE_REAL_MOTOROLA,
v4c.DATA_TYPE_SIGNED_MOTOROLA,
)
record_size = group['channel_group']['samples_byte_nr']
channel = group['channels'][ch_nr]
bit_offset = channel['bit_offset']
byte_offset = channel['byte_offset']
bit_count = channel['bit_count']
dependency_list = group['channel_dependencies'][ch_nr]
if dependency_list and isinstance(dependency_list[0], ChannelArrayBlock):
ca_block = dependency_list[0]
size = bit_count >> 3
shape = tuple(
elif data_type == v4c.DATA_TYPE_STRING_UTF_16_LE:
encoding = 'utf-16-le'
elif data_type == v4c.DATA_TYPE_STRING_UTF_8:
encoding = 'utf-8'
elif data_type == v4c.DATA_TYPE_STRING_LATIN_1:
encoding = 'latin-1'
vals = array(
[x.decode(encoding).strip('\0') for x in vals]
)
vals = encode(vals, 'latin-1')
# CANopen date
elif data_type == v4c.DATA_TYPE_CANOPEN_DATE:
vals = vals.tostring()
types = dtype(
[('ms', '
vals = vals & mask
if data_type in v4c.SIGNED_INT:
size = vals.dtype.itemsize
mask = (1 << (size * 8)) - 1
mask = (mask << bits) & mask
vals |= mask
vals = vals.astype('
res.append(o)
break
else:
res.append(default)
vals = array(res)
info = {
'input': in_,
'output': out_,
'default': default,
}
# in case of invalidation bits, valid_index will hold the valid indexes
valid_index = None
if grp['channel_group']['invalidation_bytes_nr']:
if channel['flags'] & (v4c.FLAG_INVALIDATION_BIT_VALID | v4c.FLAG_ALL_SAMPLES_VALID) == v4c.FLAG_INVALIDATION_BIT_VALID:
ch_invalidation_pos = channel['pos_invalidation_bit']
pos_byte, pos_offset = divmod(ch_invalidation_pos)
mask = 1 << pos_offset
inval_bytes = record['invalidation_bytes']
inval_index = array(
[bytes_[pos_byte] & mask for bytes_ in inval_bytes]
)
valid_index = argwhere(inval_index == 0).flatten()
vals = vals[valid_index]
if samples_only:
res = vals
else:
# search for unit in conversion texts
group=dg_nr,
index=ch_nr,
samples_only=True)
axis_values = axis_values[axis.name]
arrays.append(axis_values)
dtype_pair = axis.name, axis_values.dtype, shape
types.append(dtype_pair)
if PYVERSION == 2:
types = fix_dtype_fields(types)
vals = fromarrays(arrays, dtype(types))
else:
# get channel values
if channel['channel_type'] in (v4c.CHANNEL_TYPE_VIRTUAL,
v4c.CHANNEL_TYPE_VIRTUAL_MASTER):
data_type = channel['data_type']
ch_dtype = dtype(get_fmt(data_type, 8))
cycles = grp['channel_group']['cycles_nr']
vals = arange(cycles, dtype=ch_dtype)
else:
try:
parent, bit_offset = parents[ch_nr]
except KeyError:
parent, bit_offset = None, None
if parent is not None:
if 'record' not in grp:
if dtypes.itemsize:
record = fromstring(data, dtype=dtypes)
else:
record = None
axis = self.groups[dg_nr]['channels'][ch_nr]
shape = (ca_block['dim_size_{}'.format(i)], )
axis_values = self.get(
group=dg_nr,
index=ch_nr,
samples_only=True)
axis_values = axis_values[axis.name]
arrays.append(axis_values)
dtype_pair = (
axis.name,
axis_values.dtype,
shape,
)
types.append(dtype_pair)
elif ca_block['ca_type'] == v4c.CA_TYPE_ARRAY:
shape = vals.shape[1:]
arrays.append(vals)
dtype_pair = channel.name, vals.dtype, shape
types.append(dtype_pair)
for ca_block in dependency_list[1:]:
dims_nr = ca_block['dims']
if ca_block['flags'] & v4c.FLAG_CA_FIXED_AXIS:
for i in range(dims_nr):
shape = (ca_block['dim_size_{}'.format(i)], )
axis = []
for j in range(shape[0]):
key = 'axis_{}_value_{}'.format(i, j)
axis.append(ca_block[key])
axis = array([axis for _ in range(cycles_nr)])
vals |= mask
vals = vals.astype('