diff --git a/pyproject.toml b/pyproject.toml index 594f3f1c..8e92c374 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,7 +45,6 @@ dependencies = [ "knowit>=0.5.5", "platformdirs>=3", "pysubs2>=1.7", - "rarfile>=2.7", "requests>=2.0", "srt>=3.5", "stevedore>=3.0", @@ -55,6 +54,7 @@ dependencies = [ # extras # https://peps.python.org/pep-0621/#dependencies-optional-dependencies [project.optional-dependencies] +rar = ["rarfile>=2.7"] docs = [ "sphinx", "sphinx_rtd_theme>=2", diff --git a/src/subliminal/archives.py b/src/subliminal/archives.py new file mode 100644 index 00000000..4adb04ba --- /dev/null +++ b/src/subliminal/archives.py @@ -0,0 +1,137 @@ +"""Core functions.""" + +from __future__ import annotations + +import logging +import operator +import os +import warnings +from pathlib import Path +from zipfile import BadZipfile + +from guessit import guessit # type: ignore[import-untyped] + +from .exceptions import ArchiveError +from .video import VIDEO_EXTENSIONS, Video + +logger = logging.getLogger(__name__) + + +try: + from rarfile import ( # type: ignore[import-untyped] + BadRarFile, + Error, + NotRarFile, + RarCannotExec, + RarFile, + is_rarfile, + ) + + #: Supported archive extensions (.rar) + ARCHIVE_EXTENSIONS: tuple[str] = ('.rar',) + + #: Supported archive errors + ARCHIVE_ERRORS: tuple[Exception] = (ArchiveError, BadZipfile, BadRarFile) # type: ignore[assignment] + +except ImportError: + #: Supported archive extensions + ARCHIVE_EXTENSIONS: tuple[str] = () # type: ignore[no-redef] + + #: Supported archive errors + ARCHIVE_ERRORS: tuple[Exception] = (ArchiveError, BadZipfile) # type: ignore[no-redef] + + +def is_supported_archive(filename: str) -> bool: + """Check if an archive format is supported and warn to install additional modules.""" + if filename.lower().endswith(ARCHIVE_EXTENSIONS): + return True + + if filename.lower().endswith('.rar'): + msg = 'Install the rarfile module to be able to read rar archives.' + warnings.warn(msg, UserWarning, stacklevel=2) + + return False + + +def scan_archive(path: str | os.PathLike, name: str | None = None) -> Video: # pragma: no cover + """Scan an archive from a `path`. + + :param str path: existing path to the archive. + :param str name: if defined, name to use with guessit instead of the path. + :return: the scanned video. + :rtype: :class:`~subliminal.video.Video` + :raises: :class:`ArchiveError`: error opening the archive. + """ + path = Path(path) + + # rar + if '.rar' in ARCHIVE_EXTENSIONS and path.name.lower().endswith('.rar'): + try: + video = scan_archive_rar(path, name=name) + except (Error, NotRarFile, RarCannotExec, ValueError) as e: + raise ArchiveError from e + + return video + + raise ArchiveError + + +def scan_archive_rar(path: str | os.PathLike, name: str | None = None) -> Video: # pragma: no cover + """Scan a rar archive from a `path`. + + :param str path: existing path to the archive. + :param str name: if defined, name to use with guessit instead of the path. + :return: the scanned video. + :rtype: :class:`~subliminal.video.Video` + :raises: :class:`ValueError`: video path is not well defined. + """ + path = os.fspath(path) + # check for non-existing path + if not os.path.exists(path): # pragma: no cover + msg = 'Path does not exist' + raise ValueError(msg) + + if not is_rarfile(path): + msg = f'{os.path.splitext(path)[1]!r} is not a valid archive' + raise ValueError(msg) + + dir_path, filename = os.path.split(path) + + logger.info('Scanning archive %r in %r', filename, dir_path) + + # Get filename and file size from RAR + rar = RarFile(path) + + # check that the rar doesnt need a password + if rar.needs_password(): + msg = 'Rar requires a password' + raise ValueError(msg) + + # raise an exception if the rar file is broken + # must be called to avoid a potential deadlock with some broken rars + rar.testrar() + + file_infos = [f for f in rar.infolist() if not f.isdir() and f.filename.endswith(VIDEO_EXTENSIONS)] + + # sort by file size descending, the largest video in the archive is the one we want, there may be samples or intros + file_infos.sort(key=operator.attrgetter('file_size'), reverse=True) + + # no video found + if not file_infos: + msg = 'No video in archive' + raise ValueError(msg) + + # Free the information about irrelevant files before guessing + file_info = file_infos[0] + + # guess + video_filename = file_info.filename + video_path = os.path.join(dir_path, video_filename) + + repl = name if name else video_path + video = Video.fromguess(video_path, guessit(repl)) + + # size + video.size = file_info.file_size + + return video diff --git a/src/subliminal/core.py b/src/subliminal/core.py index f9acc86e..f60cc00d 100644 --- a/src/subliminal/core.py +++ b/src/subliminal/core.py @@ -9,12 +9,12 @@ from collections import defaultdict from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING, Any -from zipfile import BadZipfile from babelfish import Language, LanguageReverseError # type: ignore[import-untyped] from guessit import guessit # type: ignore[import-untyped] -from rarfile import BadRarFile, Error, NotRarFile, RarCannotExec, RarFile, is_rarfile # type: ignore[import-untyped] +from .archives import ARCHIVE_ERRORS, ARCHIVE_EXTENSIONS, is_supported_archive, scan_archive +from .exceptions import ArchiveError from .extensions import ( discarded_episode_refiners, discarded_movie_refiners, @@ -36,10 +36,6 @@ from subliminal.providers import Provider from subliminal.score import ComputeScore - -#: Supported archive extensions (.rar) -ARCHIVE_EXTENSIONS = ('.rar',) - logger = logging.getLogger(__name__) @@ -200,7 +196,7 @@ def download_subtitle(self, subtitle: Subtitle) -> bool: logger.info('Downloading subtitle %r', subtitle) try: self[subtitle.provider_name].download_subtitle(subtitle) - except (BadZipfile, BadRarFile): # pragma: no cover + except ARCHIVE_ERRORS: # type: ignore[misc] # pragma: no cover logger.exception('Bad archive for subtitle %r', subtitle) except Exception as e: # noqa: BLE001 handle_exception(e, f'Discarding provider {subtitle.provider_name}') @@ -511,67 +507,6 @@ def scan_video(path: str | os.PathLike, name: str | None = None) -> Video: return video -def scan_archive(path: str | os.PathLike, name: str | None = None) -> Video: # pragma: no cover - """Scan an archive from a `path`. - - :param str path: existing path to the archive. - :param str name: if defined, name to use with guessit instead of the path. - :return: the scanned video. - :rtype: :class:`~subliminal.video.Video` - :raises: :class:`ValueError`: video path is not well defined. - """ - path = os.fspath(path) - # check for non-existing path - if not os.path.exists(path): # pragma: no cover - msg = 'Path does not exist' - raise ValueError(msg) - - if not is_rarfile(path): - msg = f'{os.path.splitext(path)[1]!r} is not a valid archive' - raise ValueError(msg) - - dir_path, filename = os.path.split(path) - - logger.info('Scanning archive %r in %r', filename, dir_path) - - # Get filename and file size from RAR - rar = RarFile(path) - - # check that the rar doesnt need a password - if rar.needs_password(): - msg = 'Rar requires a password' - raise ValueError(msg) - - # raise an exception if the rar file is broken - # must be called to avoid a potential deadlock with some broken rars - rar.testrar() - - file_infos = [f for f in rar.infolist() if not f.isdir() and f.filename.endswith(VIDEO_EXTENSIONS)] - - # sort by file size descending, the largest video in the archive is the one we want, there may be samples or intros - file_infos.sort(key=operator.attrgetter('file_size'), reverse=True) - - # no video found - if not file_infos: - msg = 'No video in archive' - raise ValueError(msg) - - # Free the information about irrelevant files before guessing - file_info = file_infos[0] - - # guess - video_filename = file_info.filename - video_path = os.path.join(dir_path, video_filename) - - repl = name if name else video_path - video = Video.fromguess(video_path, guessit(repl)) - - # size - video.size = file_info.file_size - - return video - - def scan_videos( path: str | os.PathLike, *, @@ -663,10 +598,10 @@ def scan_videos( except ValueError: # pragma: no cover logger.exception('Error scanning video') continue - elif archives and filename.lower().endswith(ARCHIVE_EXTENSIONS): # archive + elif archives and is_supported_archive(filename): # archive try: video = scan_archive(filepath, name=name) - except (Error, NotRarFile, RarCannotExec, ValueError): # pragma: no cover + except (ArchiveError, ValueError): # pragma: no cover logger.exception('Error scanning archive') continue else: # pragma: no cover diff --git a/src/subliminal/exceptions.py b/src/subliminal/exceptions.py index dd61f197..4b3659b8 100644 --- a/src/subliminal/exceptions.py +++ b/src/subliminal/exceptions.py @@ -7,6 +7,12 @@ class Error(Exception): pass +class ArchiveError(Error): + """Exception raised by reading an archive.""" + + pass + + class ProviderError(Error): """Exception raised by providers."""