Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_read(session):
test_voltage = 1.0
test_record_length = 2000
test_channels = range(2)
test_num_channels = 2
test_num_records = 3
session.configure_vertical(test_voltage, niscope.VerticalCoupling.AC)
session.configure_horizontal_timing(50000000, test_record_length, 50.0, test_num_records, True)
waveforms = session.channels[test_channels].read(num_samples=test_record_length, num_records=test_num_records)
assert len(waveforms) == test_num_channels * test_num_records
for i in range(len(waveforms)):
assert len(waveforms[i].samples) == test_record_length
def example(resource_name, options, total_acquisition_time_in_seconds, voltage, sample_rate_in_hz, samples_per_fetch):
total_samples = int(total_acquisition_time_in_seconds * sample_rate_in_hz)
# 1. Opening session
with niscope.Session(resource_name=resource_name, options=options) as session:
# We will acquire on all channels of the device
channel_list = [c for c in range(session.channel_count)] # Need an actual list and not a range
# 2. Creating numpy arrays
waveforms = [np.ndarray(total_samples, dtype=np.float64) for c in channel_list]
# 3. Configuring
session.configure_horizontal_timing(min_sample_rate=sample_rate_in_hz, min_num_pts=1, ref_position=0.0, num_records=1, enforce_realtime=True)
session.channels[channel_list].configure_vertical(voltage, coupling=niscope.VerticalCoupling.DC, enabled=True)
# Configure software trigger, but never send the trigger.
# This starts an infinite acquisition, until you call session.abort() or session.close()
session.configure_trigger_software()
current_pos = 0
# 4. initiating
with session.initiate():
while current_pos < total_samples:
# We fetch each channel at a time so we don't have to de-interleave afterwards
# We do not keep the wfm_info returned from fetch_into
for channel, waveform in zip(channel_list, waveforms):
# 5. fetching - we return the slice of the waveform array that we want to "fetch into"
session.channels[channel].fetch_into(waveform[current_pos:current_pos + samples_per_fetch], relative_to=niscope.FetchRelativeTo.READ_POINTER,
offset=0, record_number=0, num_records=1, timeout=hightime.timedelta(seconds=5.0))
current_pos += samples_per_fetch