-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathRWaveAnalysis.py
81 lines (71 loc) · 3.38 KB
/
RWaveAnalysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import collections
import datetime
import isodate
class RWaveAnalysis:
def __init__(self, notch, featureThreshold, timespan, bufferLength):
self.notch = notch
self.featureThreshold = featureThreshold
self.timespan = timespan
self.bufferLength = bufferLength
# initializes rWaveBuffer as a Python deque, a doubly linked list for O(1) pushing and popping
self.rWaveBuffer = collections.deque()
self.rWaveMaxBuffer = []
self.captureStream = False
# sets default heartrate before buffer builds
self.heartRate = '65'
def addToBuffer(self, timeStamp):
# converts ISO 8601 string timestamp to Python Datetime Object without timezone info
dateTimeObj = isodate.parse_datetime(timeStamp).replace(tzinfo=None)
self.rWaveBuffer.append(dateTimeObj)
if len(self.rWaveBuffer) > self.bufferLength:
self.rWaveBuffer.popleft()
def checkBuffer(self):
# creates list of RR-intervals
timeDiffs = [(self.rWaveBuffer[i+1] - self.rWaveBuffer[i]).total_seconds() for i in range(len(self.rWaveBuffer)-1)]
self.getHeartRate(timeDiffs)
# filters list of RR-intervals for abnormal length intervals specified by timespan
significantFeatures = [timeDiffs[i] for i in range(len(timeDiffs)) if timeDiffs[i] < self.timespan]
if len(significantFeatures) > self.featureThreshold:
self.resetFeatures()
return {'statusCode':'404', 'heartRate': self.heartRate}
else:
self.resetFeatures()
return {'statusCode':'200', 'heartRate': self.heartRate}
def findRWavePeak(self, data):
# catches cases where incoming data has abnormal amplitude buildup from race conditions on the hardware, i.e. data['amplitude'] = 4.512.2553.233
if len(data['amplitude']) > 5:
data['amplitude'] = data['amplitude'][0:5]
# starts recording when notch threshold is broken. Commented part can be used for real data and adjusted for accuracy
if float(data['amplitude']) > self.notch and not self.captureStream: # and isodate.parse_datetime(data['time']) > ((self.rWaveBuffer[len(self.rWaveBuffer) - 1] + datetime.timedelta(0, 0, 0, 600))):
self.captureStream = True
self.rWaveMaxBuffer.append(data)
# commented section below should be used for real data. builds real rWaveMaxBuffer to extract peak
# elif self.captureStream and float(data['amplitude']) > self.notch:
# self.rWaveMaxBuffer.append(data)
if len(self.rWaveMaxBuffer):
self.captureStream = False
# finds peak in rWaveMaxBuffer and records time of peak
rWavePeakTime = max(self.rWaveMaxBuffer, key=lambda x: float(x['amplitude']))['time']
self.addToBuffer(rWavePeakTime)
self.resetRWaveMaxBuffer()
def analyze(self, data):
self.findRWavePeak(data)
# uncomment below for ASCII data graph in console
# self.drawData(data)
return self.checkBuffer()
def getHeartRate(self, timeDiffs):
if not timeDiffs or len(timeDiffs) < 5:
self.heartRate = self.heartRate
else:
# calculates BPM
self.heartRate = format(60 / (reduce(lambda x, y: x + y, timeDiffs) / len(timeDiffs)), '.0f')
def resetFeatures(self):
self.significantFeatures = 0
def resetRWaveMaxBuffer(self):
self.rWaveMaxBuffer = []
# visualize data with ASCII console graph, adjusted by offset
def drawData(self, data):
offset = 90
for idx in xrange(int(float(data['amplitude']) ** 3 - offset)):
print('|'),
print('\n'),