Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
operand1 = description["operand1"]
if isinstance(operand1, dict):
operand1 = compute(operand1, measured_signals, all_timebase)
elif isinstance(operand1, str):
operand1 = measured_signals[operand1]
operand2 = description["operand2"]
if isinstance(operand2, dict):
operand2 = compute(operand2, measured_signals, all_timebase)
elif isinstance(operand2, str):
operand2 = measured_signals[operand2]
result = eval(f"operand1 {op} operand2")
if not hasattr(result, "name"):
result = Signal(
name="_",
samples=np.ones(len(all_timebase)) * result,
timestamps=all_timebase,
)
else:
function = description["name"]
args = description["args"]
channel = description["channel"]
if isinstance(channel, dict):
channel = compute(channel, measured_signals, all_timebase)
else:
channel = measured_signals[channel]
self.name,
self.conversion,
self.comment,
self.raw,
self.master_metadata,
self.display_name,
self.attachment,
self.source,
self.bit_count,
self.stream_sync,
encoding=self.encoding,
)
elif start is None and stop is None:
# return the channel uncut
result = Signal(
self.samples.copy(),
self.timestamps.copy(),
self.unit,
self.name,
self.conversion,
self.comment,
self.raw,
self.master_metadata,
self.display_name,
self.attachment,
self.source,
self.bit_count,
self.stream_sync,
invalidation_bits=self.invalidation_bits.copy()
if self.invalidation_bits is not None
else None,
def astype(self, np_type):
""" returns new *Signal* with samples of dtype *np_type*
Parameters
----------
np_type : np.dtype
new numpy dtye
Returns
-------
signal : Signal
new *Signal* with the samples of *np_type* dtype
"""
return Signal(
self.samples.astype(np_type),
self.timestamps,
unit=self.unit,
name=self.name,
conversion=self.conversion,
raw=self.raw,
master_metadata=self.master_metadata,
display_name=self.display_name,
attachment=self.attachment,
stream_sync=self.stream_sync,
invalidation_bits=self.invalidation_bits,
source=self.source,
encoding=self.encoding,
)
if not samples_only or raster:
if count > 1:
timestamps = concatenate(timestamps)
else:
timestamps = timestamps[0]
if raster and len(timestamps) > 1:
num = float(float32((timestamps[-1] - timestamps[0]) / raster))
if num.is_integer():
t = linspace(timestamps[0], timestamps[-1], int(num))
else:
t = arange(timestamps[0], timestamps[-1], raster)
vals = (
Signal(vals, timestamps, name="_")
.interp(t, interpolation_mode=self._integer_interpolation)
.samples
)
timestamps = t
if conversion is None:
conversion_type = v23c.CONVERSION_TYPE_NONE
else:
conversion_type = conversion.conversion_type
if conversion_type == v23c.CONVERSION_TYPE_NONE:
if vals.dtype.kind == "S":
encoding = "latin-1"
if not raw:
samples.view(np.uint16)
.byteswap()
.view(samples.dtype)
)
samples = encode(
decode(samples, "utf-16-be"), "latin-1"
)
else:
samples = encode(
decode(samples, encoding), "latin-1"
)
sig.samples = samples
if len(master):
sig = (
Signal(sig[0], t, invalidation_bits=sig[1], name='_')
.interp(master, interpolation_mode=interpolation_mode)
)
sig = sig.samples, sig.invalidation_bits
if not sig[0].flags.writeable:
sig = sig[0].copy(), sig[1]
sigs.append(sig)
if sigs:
mdf.extend(cg_nr, sigs)
if self._callback:
self._callback(cg_nr + 1, groups_nr)
if self._terminate:
return
new_timestamps : np.array
timestamps used for interpolation
interpolation_mode : int
interpolation mode for integer signals; default 0
* 0 - repeat previous samples
* 1 - linear interpolation
Returns
-------
signal : Signal
new interpolated *Signal*
"""
if not len(self.samples) or not len(new_timestamps):
return Signal(
self.samples.copy(),
self.timestamps.copy(),
self.unit,
self.name,
comment=self.comment,
conversion=self.conversion,
raw=self.raw,
master_metadata=self.master_metadata,
display_name=self.display_name,
attachment=self.attachment,
stream_sync=self.stream_sync,
invalidation_bits=self.invalidation_bits.copy()
if self.invalidation_bits is not None
else None,
encoding=self.encoding,
)
if time_from_zero:
timestamps = timestamps - delta
sigs = [(timestamps, None)]
for j in included_channels:
sig = self.get(
group=i,
index=j,
data=fragment,
raw=True,
samples_only=True,
ignore_invalidation_bits=True,
)
if needs_cutting:
_sig = Signal(
sig[0], master, name="_", invalidation_bits=sig[1]
).interp(cut_timebase, interpolation_mode=interpolation_mode)
sig = (_sig.samples, _sig.invalidation_bits)
del _sig
sigs.append(sig)
if sigs:
out.extend(cg_nr, sigs)
idx += 1
group.record = None
# if the cut interval is not found in the measurement
# then append an empty data group
X1 = vals
vals = evaluate(formula)
if samples_only:
res = vals
else:
if conversion:
unit = conversion['unit'].decode('latin-1').strip(' \n\t\0')
else:
unit = ''
comment = channel['description'].decode('latin-1')
comment = comment.strip(' \t\n\0')
t = self.get_master(gp_nr, data)
res = Signal(
samples=vals,
timestamps=t,
unit=unit,
name=channel.name,
comment=comment,
info=info,
)
if raster and t:
tx = linspace(0, t[-1], int(t[-1] / raster))
res = res.interp(tx)
return res
""" appply invalidation bits if they are available for this signal
Arguments
---------
copy (True) : bool
return a copy of the result
.. versionadded:: 5.12.0
"""
if self.invalidation_bits is None:
signal = self
else:
idx = np.nonzero(~self.invalidation_bits)[0]
signal = Signal(
self.samples[idx],
self.timestamps[idx],
self.unit,
self.name,
self.conversion,
self.comment,
self.raw,
self.master_metadata,
self.display_name,
self.attachment,
self.source,
self.bit_count,
self.stream_sync,
invalidation_bits=None,
encoding=self.encoding,
)
try:
comment = XML.fromstring(comment).find('TX').text
except:
comment = ''
else:
comment = comment['text'].decode('utf-8')
else:
comment = ''
t = self.get_master(gp_nr, data)
# consider invalidation bits
if valid_index is not None:
t = t[valid_index]
res = Signal(
samples=vals,
timestamps=t,
unit=unit,
name=channel.name,
comment=comment,
info=info,
)
if raster and t:
tx = linspace(0, t[-1], int(t[-1] / raster))
res = res.interp(tx)
return res