From fe1e9c24e3ce0bb1387af2c2a2d566db83a78129 Mon Sep 17 00:00:00 2001 From: Harry Tung Date: Tue, 25 Jun 2024 17:03:18 -0700 Subject: [PATCH] Adding ONVIF discovery for supported RTSP Cameras (#40) * Added new file for rtsp onvif discovery * Updated package imports * Added function to generate rtsp urls * Fixed fetch url error * Added basic functions for RTSP discovery and Pydantic model for ONVIF data * Added ONVIFDeviceInfo into package * Fixed bug in getting RTSP URLs with default credentials * Added comments and tests * Bumped FrameGrab version to 0.5.3 * Fixed subscripting error for Python 3.7 * Fixed issue with unsupported Type subscripting in Python 3.7 * Added instructions * Automatically reformatting code with black and isort * Updated comments and instructions * Changed return type for generate_rtsp_urls * Added different discovery modes * Automatically reformatting code with black and isort * Added link to RTSP discovery * Automatically reformatting code with black and isort * Updated readme * Fixed tests * Fixed unsupported type hinting for python 3.8 * Changed light discovery to only look for two default credentials (no password) * Automatically reformatting code with black and isort * Added support to old python version (3.8) * Automatically reformatting code with black and isort * Fixed incorrect comments * Updated ONVIF discovery based on feedback * Automatically reformatting code with black and isort * Updated Readme and added comments --------- Co-authored-by: Auto-format Bot --- README.md | 39 ++++++- pyproject.toml | 5 +- src/framegrab/__init__.py | 4 + src/framegrab/rtsp_discovery.py | 196 ++++++++++++++++++++++++++++++++ test/test_rtsp_discovery.py | 31 +++++ 5 files changed, 273 insertions(+), 2 deletions(-) create mode 100644 src/framegrab/rtsp_discovery.py create mode 100644 test/test_rtsp_discovery.py diff --git a/README.md b/README.md index e11798e..3d9cd8b 100644 --- a/README.md +++ b/README.md @@ -174,7 +174,7 @@ The table below shows all available configurations and the cameras to which they In addition to the configurations in the table above, you can set any Basler camera property by including `options.basler.`. For example, it's common to set `options.basler.PixelFormat` to `RGB8`. ### Autodiscovery -Autodiscovery automatically connects to all cameras that are plugged into your machine or discoverable on the network, including `generic_usb`, `realsense` and `basler` cameras. Default configurations will be loaded for each camera. Please note that RTSP streams cannot be discovered in this manner; RTSP URLs must be specified in the configurations. +Autodiscovery automatically connects to all cameras that are plugged into your machine or discoverable on the network, including `generic_usb`, `realsense` and `basler` cameras. Default configurations will be loaded for each camera. Please note that RTSP streams cannot be discovered in this manner; RTSP URLs must be specified in the configurations or can be discovered using a separate tool below. Autodiscovery is great for simple applications where you don't need to set any special options on your cameras. It's also a convenient method for finding the serial numbers of your cameras (if the serial number isn't printed on the camera). ```python @@ -187,6 +187,43 @@ for grabber in grabbers.values(): grabber.release() ``` +#### RTSP Discovery +RTSP cameras with support for ONVIF can be discovered on your local network in the following way: + +```python +from framegrab import RTSPDiscovery, ONVIFDeviceInfo + +devices = RTSPDiscovery.discover_camera_ips() +``` + +The `discover_onvif_devices()` will provide a list of devices that it finds in the `ONVIFDeviceInfo` format. An optional mode `auto_discover_modes` can be used to try different default credentials to fetch RTSP URLs: + +- disable: Disable guessing camera credentials. +- light: Only try first two usernames and passwords ("admin:admin" and no username/password). +- complete_fast: Try the entire DEFAULT_CREDENTIALS without delays in between. +- complete_slow: Try the entire DEFAULT_CREDENTIALS with a delay of 1 seconds in between. + + +After getting the list and enter the username and password of the camera. Use `generate_rtsp_urls()` to generate RTSP URLs for each devices. + +```python +for device in devices: + RTSPDiscovery.generate_rtsp_urls(device=device) +``` + +This will generate all the available RTSP URLs and can be used when creating `FrameGrabber.create_grabbers` to grab frames. + +```python +config = f""" +name: Front Door Camera +input_type: rtsp +id: + rtsp_url: {device.rtsp_urls[0]} +""" + +grabber = FrameGrabber.create_grabber_yaml(config) +``` + ### Motion Detection To use the built-in motion detection functionality, first create a `MotionDetector` object, specifying the percentage threshold for motion detection: diff --git a/pyproject.toml b/pyproject.toml index 0ff8062..3faef40 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "framegrab" -version = "0.5.2" +version = "0.5.3" description = "Easily grab frames from cameras or streams" authors = ["Groundlight "] license = "MIT" @@ -15,6 +15,9 @@ pyyaml = "^6.0.1" imgcat = "^0.5.0" click = "^8.1.6" ascii-magic = "^2.3.0" +wsdiscovery = "^2.0.0" +onvif-zeep = "^0.2.12" +pydantic = "2.5.3" [tool.poetry.group.dev.dependencies] black = "^23.3.0" diff --git a/src/framegrab/__init__.py b/src/framegrab/__init__.py index 16d33e2..64792b7 100644 --- a/src/framegrab/__init__.py +++ b/src/framegrab/__init__.py @@ -1,6 +1,7 @@ from .cli.clitools import preview_image from .grabber import FrameGrabber from .motion import MotionDetector +from .rtsp_discovery import AutodiscoverModes, ONVIFDeviceInfo, RTSPDiscovery try: import importlib.metadata @@ -16,5 +17,8 @@ __all__ = [ "FrameGrabber", "MotionDetector", + "RTSPDiscovery", + "ONVIFDeviceInfo", + "AutodiscoverModes", "preview_image", ] diff --git a/src/framegrab/rtsp_discovery.py b/src/framegrab/rtsp_discovery.py new file mode 100644 index 0000000..ac04113 --- /dev/null +++ b/src/framegrab/rtsp_discovery.py @@ -0,0 +1,196 @@ +import logging +import time +import urllib.parse +from enum import Enum +from typing import List, Optional + +from onvif import ONVIFCamera +from pydantic import BaseModel +from wsdiscovery import QName +from wsdiscovery.discovery import ThreadedWSDiscovery as WSDiscovery + +logger = logging.getLogger(__name__) + + +# Default credentials to try when connecting to RTSP cameras, used in discover_camera_ips() when try_default_logins=True +DEFAULT_CREDENTIALS = [ + ("admin", "admin"), + ("", ""), + ("admin", ""), + ("root", "camera"), + ("root", "root"), + ("admin", "12345"), + ("admin", "123456"), + ("admin", "password"), + ("user", "user"), + ("root", "pass"), +] + +""" +Enum for camera discovery modes. Options to try different default credentials stored in DEFAULT_CREDENTIALS. +Consists of four options: + disable: Disable guessing camera credentials. + light: Only try first two usernames and passwords ("admin:admin" and no username/password). + complete_fast: Try the entire DEFAULT_CREDENTIALS without delays in between. + complete_slow: Try the entire DEFAULT_CREDENTIALS with a delay of 1 seconds in between. +""" + + +class AutodiscoverModes(str, Enum): + disable = "disable" + light = "light" + complete_fast = "complete_fast" + complete_slow = "complete_slow" + + +""" +Model for storing ONVIF Device Information: + ip[str]: IP address of the RTSP Camera. + port[int]: Port number of the RTSP Camera. Defaults to 80 if not specified. + username[str]: Username. + password[str]: Password. + xddr[str]: ONVIF service address. + rtsp_urls[List[str]]: List of RTSP URLs for the camera. +""" + + +class ONVIFDeviceInfo(BaseModel): + ip: str + port: Optional[int] = 80 + username: Optional[str] = "" + password: Optional[str] = "" + xaddr: Optional[str] = "" + rtsp_urls: Optional[List[str]] = [] + + +class RTSPDiscovery: + """Simple RTSP camera discovery with ONVIF capabilities""" + + @staticmethod + def discover_onvif_devices( + auto_discover_modes: AutodiscoverModes = AutodiscoverModes.disable, + ) -> List[ONVIFDeviceInfo]: + """ + Uses WSDiscovery to find ONVIF supported devices. + + Parameters: + auto_discover_modes (AutodiscoverModes, optional): Options to try different default credentials stored in DEFAULT_CREDENTIALS. + Consists of four options: + disable: Disable guessing camera credentials. + light: Only try first two usernames and passwords ("admin:admin" and no username/password). + complete_fast: Try the entire DEFAULT_CREDENTIALS without delays in between. + complete_slow: Try the entire DEFAULT_CREDENTIALS with a delay of 1 seconds in between. + Defaults to disable. + + Returns: + List[ONVIFDeviceInfo]: A list of ONVIFDeviceInfos with IP address, port number, and ONVIF service address. + """ + + device_ips = [] + logger.debug("Starting WSDiscovery for ONVIF devices") + wsd = WSDiscovery() + wsd.start() + types = [QName("http://www.onvif.org/ver10/network/wsdl", "NetworkVideoTransmitter")] + ret = wsd.searchServices(types=types) + for service in ret: + xaddr = service.getXAddrs()[0] + parsed_url = urllib.parse.urlparse(xaddr) + ip = parsed_url.hostname + port = parsed_url.port or 80 # Use the default port 80 if not specified + + logger.debug(f"Found ONVIF service at {xaddr}") + device_ip = ONVIFDeviceInfo(ip=ip, port=port, username="", password="", xaddr=xaddr, rtsp_urls=[]) + + if auto_discover_modes is not AutodiscoverModes.disable: + RTSPDiscovery._try_logins(device=device_ip, auto_discover_modes=auto_discover_modes) + + device_ips.append(device_ip) + wsd.stop() + return device_ips + + @staticmethod + def generate_rtsp_urls(device: ONVIFDeviceInfo) -> List[str]: + """ + Fetch RTSP URLs from an ONVIF supported device, given a username/password. + + Parameters: + device (ONVIFDeviceInfo): Pydantic Model that stores information about camera RTSP address, port number, username, and password. + + Returns: + List[str]: A list of RTSP URLs, empty list if error fetching URLs or incorrect credentials. + """ + + rtsp_urls = [] + try: + # Assuming port 80, adjust if necessary + cam = ONVIFCamera(device.ip, device.port, device.username, device.password) + # Create media service + media_service = cam.create_media_service() + # Get profiles + profiles = media_service.GetProfiles() + stream_setup = { + "Stream": "RTP-Unicast", # Specify the type of stream + "Transport": {"Protocol": "RTSP"}, + } + + # For each profile, get the RTSP URL + for profile in profiles: + stream_uri = media_service.GetStreamUri({"StreamSetup": stream_setup, "ProfileToken": profile.token}) + rtsp_urls.append(stream_uri.Uri) + except Exception as e: + msg = str(e).lower() + if "auth" in msg: # looks like a bad login - give up. + return rtsp_urls + else: + logger.error(f"Error fetching RTSP URL for {device.ip}: {e}", exc_info=True) + return rtsp_urls + + # Now insert the username/password into the URLs + for i, url in enumerate(rtsp_urls): + rtsp_urls[i] = url.replace("rtsp://", f"rtsp://{device.username}:{device.password}@") + + device.rtsp_urls = rtsp_urls + return rtsp_urls + + def _try_logins(device: ONVIFDeviceInfo, auto_discover_modes: AutodiscoverModes) -> bool: + """ + Fetch RTSP URLs from an ONVIF supported device, given a username/password. + + Parameters: + device (ONVIFDeviceInfo): Pydantic Model that stores information about camera RTSP address, port number, username, and password. + auto_discover_modes (AutodiscoverModes | None, optional): Options to try different default credentials stored in DEFAULT_CREDENTIALS. + Consists of four options: + disable: Disable guessing camera credentials. + light: Only try first two usernames and passwords ("admin:admin" and no username/password). + complete_fast: Try the entire DEFAULT_CREDENTIALS without delays in between. + complete_slow: Try the entire DEFAULT_CREDENTIALS with a delay of 1 seconds in between. + + Returns: + bool: False if the device is unreachable or the credentials are wrong, else returns True and updates ONVIFDeviceInfo with updated rtsp_urls. + """ + + credentials = DEFAULT_CREDENTIALS + + if auto_discover_modes == AutodiscoverModes.disable: + return False + + if auto_discover_modes == AutodiscoverModes.light: + credentials = DEFAULT_CREDENTIALS[:2] + + for username, password in credentials: + logger.debug(f"Trying {username}:{password} for device IP {device.ip}") + + device.username = username + device.password = password + + # Try generate rtsp urls for that device, if username or password incorrect try next + if RTSPDiscovery.generate_rtsp_urls(device=device): + logger.debug(f"RTSP URL fetched successfully with {username}:{password} for device IP {device.ip}") + return True + + if auto_discover_modes == AutodiscoverModes.complete_slow: + time.sleep(1) + + # Return False when there are no correct credentials + logger.debug(f"Unable to find RTSP URLs for device IP {device.ip}") + return False diff --git a/test/test_rtsp_discovery.py b/test/test_rtsp_discovery.py new file mode 100644 index 0000000..4957d94 --- /dev/null +++ b/test/test_rtsp_discovery.py @@ -0,0 +1,31 @@ +import unittest + +from wsdiscovery.service import Service +from unittest.mock import patch +from framegrab.rtsp_discovery import RTSPDiscovery, ONVIFDeviceInfo, AutodiscoverModes + + +class TestRTSPDiscovery(unittest.TestCase): + def test_discover_camera_ips(self): + service = Service( + types="", scopes="", xAddrs=["http://localhost:8080"], epr="", instanceId="" + ) + with patch( + "wsdiscovery.discovery.ThreadedWSDiscovery.searchServices", + return_value=[service], + ) as mock_camera_ips: + devices = RTSPDiscovery.discover_onvif_devices() + + assert devices[0].xaddr == "http://localhost:8080" + + def test_generate_rtsp_urls(self): + device = ONVIFDeviceInfo(ip="0") + + assert [] == RTSPDiscovery.generate_rtsp_urls(device=device) + assert device.rtsp_urls == [] + + def test_try_logins(self): + device = ONVIFDeviceInfo(ip="0") + + assert False == RTSPDiscovery._try_logins(device=device, auto_discover_modes=AutodiscoverModes.complete_fast) + assert device.rtsp_urls == []