-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnidaqmx_usb6229_fastscan2.py
77 lines (60 loc) · 2.99 KB
/
nidaqmx_usb6229_fastscan2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import time
import numpy as np
import nidaqmx
from nidaqmx import constants
from nidaqmx import stream_readers
from matplotlib import pyplot as plt
system = nidaqmx.system.System.local()
system.driver_version
for device in system.devices:
print(device.name)
print(device.ai_simultaneous_sampling_supported)
print(device.ai_samp_modes)
class PLLreadout_nidaqmx(object):
def __init__(self, scanTime, sampRate=300, bufferSize=1000, triggerSrc='') -> None:
super().__init__()
self.Ch00_name = 'A00'
self.scanTime = scanTime
self.sampRate= sampRate
self.bufferSize = bufferSize
print(self.scanTime*self.sampRate)
self.dataSize = int(self.scanTime*self.sampRate)
self.triggerSrc = triggerSrc
def sleep(self, duration, get_now=time.perf_counter):
now = get_now()
end = now + duration
while now < end:
now = get_now()
def read(self):
with nidaqmx.Task() as task, nidaqmx.Task() as task2, nidaqmx.Task() as task3:
task.ai_channels.add_ai_voltage_chan(physical_channel="/Dev1/ai0:7", min_val=-10, max_val=10)
task.ai_channels.add_ai_voltage_chan(physical_channel="/Dev1/ai16:17", min_val=-10, max_val=10)
if self.triggerSrc == '':
print('Using start trigger from SRS delay generator')
task.triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source="/Dev1/PFI0", trigger_edge=constants.Edge.RISING)#step by step measurement doesn't need the trigger
#startTrigger
#task2.di_channels.add_di_chan(lines="Dev1/PFI0")#PFI0 is the trigger from the Standform delayer generator
#task.ci_channels.add_ci_count_edges_chan(counter="Dev1/ctr0")
#task.timing.cfg_samp_clk_timing(rate=self.sampRate, source=self.triggerSrc,sample_mode=constants.AcquisitionType.FINITE, samps_per_chan=self.dataSize)
task.timing.cfg_samp_clk_timing(rate=self.sampRate,sample_mode=constants.AcquisitionType.FINITE, samps_per_chan=self.dataSize)
#trigger source is empty for fast scan for using internal clock. Use laser trigger as source for step-by-step measurement.
# input_buf_size
task.in_stream.input_buf_size = self.sampRate * 5 # plus some extra space
reader = stream_readers.AnalogMultiChannelReader(task.in_stream)
task.start()
data = np.zeros((10, self.dataSize))
print('Ahhhahahhahaha')
reader.read_many_sample(data, self.dataSize, timeout=constants.WAIT_INFINITELY)
print('Ahhhahahhahaha')
#task.wait_until_done(self.scanTime)
if self.triggerSrc == '':
return data
else:
return np.sum(data,axis=1)/self.sampRate
if __name__ == '__main__':
a = PLLreadout_nidaqmx(scanTime=0.5)
delay =a.read()
delay = np.array(delay).flatten()
print(delay)
plt.plot(delay)
plt.show()