-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathvideostream.py
54 lines (46 loc) · 1.55 KB
/
videostream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import time
import cv2
import string
import threading
from collections import namedtuple
from eventkit import Event
Frame = namedtuple('Frame', ['time', 'image'])
class VideoStream(Event):
"""
Make a video stream available as an event that emits frames::
emit(frame)
"""
def __init__(self, camId=0, width=640, height=480):
Event.__init__(self)
self._args = camId, width, height
self._running = True
self._thread = threading.Thread(target=self._run)
self._thread.start()
def _run(self):
camId, width, height = self._args
if type(camId) is int or camId in string.digits:
camId = int(camId)
isLive = True
else:
isLive = camId.startswith('http://')
capture = cv2.VideoCapture(camId)
if capture:
capture.set(cv2.CAP_PROP_FRAME_WIDTH, width)
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
fps = capture.get(cv2.CAP_PROP_FPS)
while self._running:
capture.grab()
t = time.perf_counter()
retval, im = capture.retrieve()
if not retval:
break
frame = Frame(t, im)
self.emit_threadsafe(frame)
pause = t - time.perf_counter() + 1 / fps
if not isLive and pause > 0:
time.sleep(pause)
capture.release()
self.done_event.emit_threadsafe(self)
def stop(self):
self._running = False
self._thread.join()