-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsensor.py
229 lines (193 loc) · 6.65 KB
/
sensor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
"""This module provides an abstraction for the SDS011 air partuclate densiry sensor.
"""
import struct
import serial
#TODO: Commands against the sensor should read the reply and return success status.
class SDS011(object):
"""Provides method to read from a SDS011 air particlate density sensor
using UART.
"""
HEAD = b'\xaa'
TAIL = b'\xab'
CMD_ID = b'\xb4'
# The sent command is a read or a write
READ = b"\x00"
WRITE = b"\x01"
REPORT_MODE_CMD = b"\x02"
ACTIVE = b"\x00"
PASSIVE = b"\x01"
QUERY_CMD = b"\x04"
# The sleep command ID
SLEEP_CMD = b"\x06"
# Sleep and work byte
SLEEP = b"\x00"
WORK = b"\x01"
# The work period command ID
WORK_PERIOD_CMD = b'\x08'
FIRMWARE_VERSION_CMD = b'\x07'
def __init__(self, serial_port, baudrate=9600, timeout=2,
use_query_mode=True):
"""Initialise and open serial port.
"""
self.ser = serial.Serial(port=serial_port,
baudrate=baudrate,
timeout=timeout)
self.ser.flush()
self.set_report_mode(active=not use_query_mode)
def _execute(self, cmd_bytes):
"""Writes a byte sequence to the serial.
"""
self.ser.write(cmd_bytes)
def _get_reply(self):
"""Read reply from device."""
raw = self.ser.read(size=10)
data = raw[2:8]
if len(data) == 0:
return None
if (sum(d for d in data) & 255) != raw[8]:
return None #TODO: also check cmd id
return raw
def cmd_begin(self):
"""Get command header and command ID bytes.
@rtype: list
"""
return self.HEAD + self.CMD_ID
def set_report_mode(self, read=False, active=False):
"""Get sleep command. Does not contain checksum and tail.
@rtype: list
"""
cmd = self.cmd_begin()
cmd += (self.REPORT_MODE_CMD
+ (self.READ if read else self.WRITE)
+ (self.ACTIVE if active else self.PASSIVE)
+ b"\x00" * 10)
cmd = self._finish_cmd(cmd)
self._execute(cmd)
self._get_reply()
def query(self):
"""Query the device and read the data.
@return: Air particulate density in micrograms per cubic meter.
@rtype: tuple(float, float) -> (PM2.5, PM10)
"""
cmd = self.cmd_begin()
cmd += (self.QUERY_CMD
+ b"\x00" * 12)
cmd = self._finish_cmd(cmd)
self._execute(cmd)
raw = self._get_reply()
if raw is None:
return None #TODO:
data = struct.unpack('<HH', raw[2:6])
pm25 = data[0] / 10.0
pm10 = data[1] / 10.0
return (pm25, pm10)
def sleep(self, read=False, sleep=True):
"""Sleep/Wake up the sensor.
@param sleep: Whether the device should sleep or work.
@type sleep: bool
"""
cmd = self.cmd_begin()
cmd += (self.SLEEP_CMD
+ (self.READ if read else self.WRITE)
+ (self.SLEEP if sleep else self.WORK)
+ b"\x00" * 10)
cmd = self._finish_cmd(cmd)
self._execute(cmd)
self._get_reply()
def set_work_period(self, read=False, work_time=0):
"""Get work period command. Does not contain checksum and tail.
@rtype: list
"""
assert work_time >= 0 and work_time <= 30
cmd = self.cmd_begin()
cmd += (self.WORK_PERIOD_CMD
+ (self.READ if read else self.WRITE)
+ bytes([work_time])
+ b"\x00" * 10)
cmd = self._finish_cmd(cmd)
self._execute(cmd)
self._get_reply()
def _finish_cmd(self, cmd, id1=b"\xff", id2=b"\xff"):
"""Add device ID, checksum and tail bytes.
@rtype: list
"""
cmd += id1 + id2
checksum = sum(d for d in cmd[2:]) % 256
cmd += bytes([checksum]) + self.TAIL
return cmd
def _process_frame(self, data):
"""Process a SDS011 data frame.
Byte positions:
0 - Header
1 - Command No.
2,3 - PM2.5 low/high byte
4,5 - PM10 low/high
6,7 - ID bytes
8 - Checksum - sum of bytes 2-7
9 - Tail
"""
raw = struct.unpack('<HHxxBBB', data[2:])
checksum = sum(v for v in data[2:8]) % 256
if checksum != data[8]:
return None
pm25 = raw[0] / 10.0
pm10 = raw[1] / 10.0
return (pm25, pm10)
def read(self):
"""Read sensor data.
@return: PM2.5 and PM10 concetration in micrograms per cude meter.
@rtype: tuple(float, float) - first is PM2.5.
"""
byte = 0
while byte != self.HEAD:
byte = self.ser.read(size=1)
d = self.ser.read(size=10)
if d[0:1] == b"\xc0":
data = self._process_frame(byte + d)
return data
def check_firmware_version(self):
"""
@return: Current firwmware version, YEAR-MONTH-DAY
@rtype: string
"""
cmd = self.cmd_begin()
cmd += (self.FIRMWARE_VERSION_CMD
+ b"\x00" * 12)
cmd = self._finish_cmd(cmd)
self._execute(cmd)
raw = self._get_reply()
year = raw[2]
month =raw[3]
day = raw[4]
return f"{year}-{month}-{day}"
def calculate_aqi_pm25(concentration):
"""
Berechnet den AQI für PM2.5 mittels linearer Interpolation.
Parameter:
concentration (float): PM2.5-Konzentration in µg/m³.
Rückgabe:
(int, str): Gerundeter AQI-Wert und die zugehörige Kategorie.
"""
# Definierte Breakpoints gemäß EPA für PM2.5
breakpoints = [
# (C_lo, C_hi, I_lo, I_hi, Kategorie)
(0.0, 12.0, 0, 50, "Good"),
(12.1, 35.4, 51, 100, "Moderate"),
(35.5, 55.4, 101, 150, "Unhealthy for Sensitive Groups"),
(55.5, 150.4, 151, 200, "Unhealthy"),
(150.5, 250.4, 201, 300, "Very Unhealthy"),
(250.5, 500.4, 301, 500, "Hazardous")
]
if concentration < 0:
return 0, "Ungültige Konzentration"
for C_lo, C_hi, I_lo, I_hi, category in breakpoints:
if C_lo <= concentration <= C_hi:
# Lineare Interpolation
aqi = (I_hi - I_lo) / (C_hi - C_lo) * (concentration - C_lo) + I_lo
return round(aqi), category
# Falls die Konzentration über dem höchsten definierten Intervall liegt
return 0, "AQI außerhalb des definierten Bereichs"
if __name__ == "__main__":
sds011 = SDS011("/dev/ttyUSB0")
print(sds011.query())
sds011.ser.close()