From fbd9e08e84d649c4bb92bc566808f49d47229212 Mon Sep 17 00:00:00 2001 From: Francesco Faraone Date: Thu, 22 Feb 2024 22:04:45 +0100 Subject: [PATCH] Add rich click --- poetry.lock | 21 +++++- pyproject.toml | 1 + sud/cli.py | 70 +++++++++++-------- sud/click.py | 160 +++++++++++++++++++++++++++++++++++++++++++ sud/config.py | 24 +++++-- sud/exceptions.py | 9 ++- sud/prompt.py | 132 +++++++++++++++++++++++++++++++++++ tests/conftest.py | 5 +- tests/test_config.py | 34 +++++---- 9 files changed, 400 insertions(+), 56 deletions(-) create mode 100644 sud/click.py create mode 100644 sud/prompt.py diff --git a/poetry.lock b/poetry.lock index e0edad4..cccde5b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1121,6 +1121,25 @@ pygments = ">=2.13.0,<3.0.0" [package.extras] jupyter = ["ipywidgets (>=7.5.1,<9)"] +[[package]] +name = "rich-click" +version = "1.7.3" +description = "Format click help output nicely with rich" +optional = false +python-versions = ">=3.7" +files = [ + {file = "rich-click-1.7.3.tar.gz", hash = "sha256:bced1594c497dc007ab49508ff198bb437c576d01291c13a61658999066481f4"}, + {file = "rich_click-1.7.3-py3-none-any.whl", hash = "sha256:bc4163d4e2a3361e21c4d72d300eca6eb8896dfc978667923cb1d4937b8769a3"}, +] + +[package.dependencies] +click = ">=7" +rich = ">=10.7.0" +typing-extensions = "*" + +[package.extras] +dev = ["flake8", "flake8-docstrings", "mypy", "packaging", "pre-commit", "pytest", "pytest-cov", "types-setuptools"] + [[package]] name = "six" version = "1.16.0" @@ -1255,4 +1274,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = ">=3.10,<4" -content-hash = "aff2e7d40e5554ec1e3394019a03e7da5b091960e0a25f14e074f649fcd40ba8" +content-hash = "0156b190b2824f20fd988afa8347fcb3df7d8fb0c3874dbe8d967ab7aa8b3387" diff --git a/pyproject.toml b/pyproject.toml index 12e8733..4970e4a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ rich = "^13.7.0" pyyaml = "^6.0.1" python-telegram-bot = "^20.8" humanize = "^4.9.0" +rich-click = "^1.7.3" [tool.poetry.scripts] sud = "sud.cli:main" diff --git a/sud/cli.py b/sud/cli.py index 52b1cbb..bf5e9d1 100644 --- a/sud/cli.py +++ b/sud/cli.py @@ -1,55 +1,65 @@ import logging -import click -from click import Abort, ClickException, Option, UsageError from rich.console import Console from rich.logging import RichHandler -from sud import get_version -from sud.config import Config +from sud import click as click +from sud import config, get_version from sud.updater import Updater console = Console() -class MutuallyExclusiveOption(Option): - def __init__(self, *args, **kwargs): - self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", [])) - help = kwargs.get("help", "") - if self.mutually_exclusive: - ex_str = ", ".join(self.mutually_exclusive) - kwargs["help"] = help + ( - " NOTE: This option is mutually exclusive " - f"with options: [{ex_str}]." - ) - super(MutuallyExclusiveOption, self).__init__(*args, **kwargs) - - def handle_parse_result(self, ctx, opts, args): - if self.mutually_exclusive.intersection(opts) and self.name in opts: - raise UsageError( - f"Illegal usage: `{self.name}` is mutually exclusive with " - f"options `{', '.join(self.mutually_exclusive)}`." - ) - - @click.group() @click.option( "-c", "--config-file", - type=click.File("rb"), + type=click.Path( + dir_okay=False, + readable=True, + writable=True, + ), default="/etc/sud/sud-config.yml", + help="Load the configuration file from a specific location.", ) @click.version_option(get_version()) -@click.pass_context +@config.pass_config def cli(ctx, config_file): """ SUD the Python Scaleway DNS Updater utility. """ - ctx.obj = Config(config_file) + + +@cli.command(load_config=False) +# @click.option( +# "-H", +# "--hostname", +# required=True, +# prompt="[cyan]Hostname[/cyan]", +# ) +# @click.option( +# "-s", +# "--api-secret", +# required=True, +# prompt="[cyan]Scaleway API secret[/cyan]", +# hide_input=True, +# confirmation_prompt=True, +# ) +@click.option( + "-s", + "--frequency", + required=True, + prompt="Check frequency", + default=300, + show_default=True, +) +@config.pass_config +def init(config, frequency): + pass @cli.command() -@click.pass_obj +@config.pass_config def run(config): logging.basicConfig( level=logging.INFO, @@ -64,9 +74,9 @@ def run(config): def main(): try: cli(standalone_mode=False) - except ClickException as e: + except click.ClickException as e: console.print(f"[bold red]Error:[/bold red] {str(e)}") - except Abort: + except click.Abort: pass except Exception: console.print_exception() diff --git a/sud/click.py b/sud/click.py new file mode 100644 index 0000000..26a8c91 --- /dev/null +++ b/sud/click.py @@ -0,0 +1,160 @@ +# flake8: noqa: F401 +from typing import Any, Sequence + +import rich_click +from click.types import ParamType +from rich_click import Context as Context +from rich_click import Abort as Abort +from rich_click import ClickException as ClickException +from rich_click import Path as Path +from rich_click import version_option as version_option + +from sud.prompt import confirm, prompt + + +class SudOption(rich_click.Option): + def __init__( + self, + param_decls: Sequence[str] | None = None, + show_default: bool | str | None = None, + prompt: bool | str = False, + confirmation_prompt: bool | str = False, + prompt_required: bool = True, + hide_input: bool = False, + is_flag: bool | None = None, + flag_value: Any | None = None, + multiple: bool = False, + count: bool = False, + allow_from_autoenv: bool = True, + type: ParamType | Any | None = None, + help: str | None = None, + hidden: bool = False, + show_choices: bool = True, + show_envvar: bool = False, + **attrs: Any, + ) -> None: + self.default_is_missing = "default" not in attrs + super().__init__( + param_decls, + show_default, + prompt, + confirmation_prompt, + prompt_required, + hide_input, + is_flag, + flag_value, + multiple, + count, + allow_from_autoenv, + type, + help, + hidden, + show_choices, + show_envvar, + **attrs, + ) + + def prompt_for_value(self, ctx: Context) -> Any: + assert self.prompt is not None + + # Calculate the default before prompting anything to be stable. + default = self.get_default(ctx) + # If this is a prompt for a flag we need to handle this + # differently. + if self.is_bool_flag: + return confirm(self.prompt, default) + + return prompt( + self.prompt, + default=default, + type=self.type, + hide_input=self.hide_input, + show_choices=self.show_choices, + confirmation_prompt=self.confirmation_prompt, + value_proc=lambda x: self.process_value(ctx, x), + default_is_missing=self.default_is_missing, + ) + + +class MutuallyExclusiveOption(SudOption): + def __init__(self, *args, **kwargs): + self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", [])) + help = kwargs.get("help", "") + if self.mutually_exclusive: + ex_str = ", ".join(self.mutually_exclusive) + kwargs["help"] = help + ( + " NOTE: This option is mutually exclusive " + f"with options: [{ex_str}]." + ) + super(MutuallyExclusiveOption, self).__init__(*args, **kwargs) + + def handle_parse_result(self, ctx, opts, args): + if self.mutually_exclusive.intersection(opts) and self.name in opts: + raise rich_click.UsageError( + f"Illegal usage: `{self.name}` is mutually exclusive with " + f"options `{', '.join(self.mutually_exclusive)}`." + ) + + +class SudCommand(rich_click.Command): + def __init__(self, *args, **kwargs): + self.load_config = kwargs.pop("load_config", True) + super().__init__(*args, **kwargs) + + def invoke(self, ctx): + if self.load_config: + ctx.obj.load() + return super().invoke(ctx) + + +class SudGroup(rich_click.Group): + def command(self, *args, **kwargs): + from rich_click.decorators import command + + kwargs["cls"] = SudCommand + + def decorator(f): + cmd = command(*args, **kwargs)(f) + self.add_command(cmd) + return cmd + + return decorator + + def group(self, *args, **kwargs): + from rich_click.decorators import group + + kwargs["cls"] = SudGroup + + def decorator(f): + cmd = group(*args, **kwargs)(f) + self.add_command(cmd) + return cmd + + return decorator + + +def group(name=None, **attrs): + attrs.setdefault("cls", SudGroup) + return rich_click.command(name, **attrs) + + +def option( + *param_decls: str, + **attrs: Any, +): + attrs.setdefault("cls", SudOption) + return rich_click.option( + *param_decls, + **attrs, + ) + + +def mutually_exclusive_option( + *param_decls: str, + **attrs: Any, +): + attrs.setdefault("cls", MutuallyExclusiveOption) + return rich_click.option( + *param_decls, + **attrs, + ) diff --git a/sud/config.py b/sud/config.py index a2c8c1a..3e2a0d2 100644 --- a/sud/config.py +++ b/sud/config.py @@ -1,6 +1,8 @@ from datetime import timedelta +from functools import update_wrapper import yaml +from click import pass_context from sud.exceptions import SudException @@ -10,8 +12,8 @@ class Config: DEFAULT_FREQUENCY = 300 def __init__(self, config_file): - self._config = self._load_config(config_file) - self._validate_config() + self._config_file = config_file + self._config = None @property def hostname(self) -> str: @@ -33,13 +35,25 @@ def api_secret(self) -> str: def telegram(self) -> dict | None: return self._config.get("notifications", {}).get("telegram") - def _load_config(self, config_file): + def load(self): try: - return yaml.safe_load(config_file) + with open(self._config_file, "r") as f: + self._config = yaml.safe_load(f) except yaml.YAMLError as e: raise SudException( f"Invalid SUD configuration file: {str(e)}", ) from e - def _validate_config(self): + def validate(self): pass + + +def pass_config(f): + @pass_context + def new_func(ctx, *args, **kwargs): + obj = ctx.find_object(Config) + if not obj: + ctx.obj = obj = Config(ctx.params["config_file"]) + return ctx.invoke(f, obj, *args, **kwargs) + + return update_wrapper(new_func, f) diff --git a/sud/exceptions.py b/sud/exceptions.py index a6b6e35..44a7bbd 100644 --- a/sud/exceptions.py +++ b/sud/exceptions.py @@ -1,6 +1,5 @@ -class SudException(Exception): - def __init__(self, message): - self.message = message +from rich_click import ClickException - def __str__(self) -> str: - return self.message + +class SudException(ClickException): + pass diff --git a/sud/prompt.py b/sud/prompt.py new file mode 100644 index 0000000..4325bfe --- /dev/null +++ b/sud/prompt.py @@ -0,0 +1,132 @@ +import copy +from typing import Any, Callable, List + +from click.types import Choice, ParamType, convert_type +from rich import print +from rich.console import Console +from rich.prompt import Confirm, DefaultType, InvalidResponse +from rich.prompt import Prompt as _Prompt +from rich.text import Text + + +def confirm( + text: str, + default: bool | None = False, +): + return Confirm.ask(text, default=default) + + +class Prompt(_Prompt): + + def __init__( + self, + prompt: str | Text = "", + *, + console: Console | None = None, + password: bool = False, + choices: List[str] | None = None, + show_default: bool = True, + show_choices: bool = True, + value_proc: Callable[[str], Any] | None = None, + ) -> None: + self.value_proc = value_proc + super().__init__( + prompt, + console=console, + password=password, + choices=choices, + show_default=show_default, + show_choices=show_choices, + ) + + def make_prompt(self, default: DefaultType) -> Text: + prompt = self.prompt.copy() + prompt.end = "" + + if self.show_choices and self.choices: + _choices = "/".join(self.choices) + choices = f"[{_choices}]" + prompt.append(" ") + prompt.append(choices, "prompt.choices") + + if default != ... and self.show_default: + prompt.append(" ") + _default = self.render_default(default) + prompt.append(_default) + + prompt.append(self.prompt_suffix) + + return prompt + + def process_response(self, value: str) -> str: + value = value.strip() + try: + return_value = self.value_proc(value) if self.value_proc else value + except ValueError: + raise InvalidResponse(self.validate_error_message) + + if self.choices is not None and not self.check_choice(value): + raise InvalidResponse(self.illegal_choice_message) + + return return_value + + +def prompt( + text: str, + default: Any | None = None, + hide_input: bool = False, + confirmation_prompt: bool | str = False, + type: ParamType | Any | None = None, + value_proc: Callable[[str], Any] | None = None, + prompt_suffix: str = ": ", + show_default: bool = True, + err: bool = False, + show_choices: bool = True, + default_is_missing: bool = True, +): + if value_proc is None: + value_proc = convert_type(type, default) + + prompt_kwargs: dict[str, Any] = { + "prompt": text, + "password": hide_input, + "show_default": show_default, + "show_choices": show_choices, + "value_proc": value_proc, + } + + if type is not None and show_choices and isinstance(type, Choice): + prompt_kwargs["choices"] = type.choices + + if confirmation_prompt: + if confirmation_prompt is True: + confirmation_prompt = "Repeat for confirmation" + + prompt2_kwargs = copy.copy(prompt_kwargs) + prompt2_kwargs["prompt"] = confirmation_prompt + + prompt = Prompt(**prompt_kwargs) + prompt.prompt_suffix = prompt_suffix + prompt.value_proc = value_proc + + while True: + value = prompt(default=default if not default_is_missing else ...) + + if not confirmation_prompt: + return value + + prompt2 = Prompt(**prompt2_kwargs) + prompt2.prompt_suffix = prompt_suffix + + while True: + value2 = prompt2( + default=default if not default_is_missing else ..., + ) + is_empty = not value and not value2 + if value2 or is_empty: + break + + if value == value2: + return value + + print("Error: The two entered values do not match.") diff --git a/tests/conftest.py b/tests/conftest.py index 2cfc11a..154ae66 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -58,5 +58,6 @@ def config(mocker, config_file_factory, telegram_config_factory): config_file = config_file_factory( notifications={"telegram": telegram_config_factory}, ) - mocker.patch.object(Config, "_load_config", return_value=config_file) - return Config(mocker.MagicMock()) + mocker.patch.object(Config, "load") + c = Config(mocker.MagicMock()) + c._config = config_file diff --git a/tests/test_config.py b/tests/test_config.py index afcdcf5..ff04547 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -9,15 +9,18 @@ def test_load_basic_config(mocker, config_file_factory): config_file = config_file_factory() - safe_load = mocker.patch( + m_safe_load = mocker.patch( "sud.config.yaml.safe_load", return_value=config_file, ) - fake_file_obj = mocker.MagicMock() - - c = Config(fake_file_obj) - - safe_load.assert_called_once_with(fake_file_obj) + file_obj = mocker.MagicMock() + m_open_ctxt_mgr = mocker.MagicMock() + m_open_ctxt_mgr.__enter__.return_value = file_obj + m_open = mocker.patch("sud.config.open", return_value=m_open_ctxt_mgr) + c = Config("config.yml") + c.load() + m_open.assert_called_once_with("config.yml", "r") + m_safe_load.assert_called_once_with(file_obj) assert c.hostname == config_file["hostname"] assert c.api_secret == config_file["api_secret"] assert isinstance(c.frequency, timedelta) @@ -33,15 +36,19 @@ def test_load_full_config( frequency=123, notifications={"telegram": telegram_config}, ) - safe_load = mocker.patch( + m_safe_load = mocker.patch( "sud.config.yaml.safe_load", return_value=config_file, ) - fake_file_obj = mocker.MagicMock() - - c = Config(fake_file_obj) + file_obj = mocker.MagicMock() + m_open_ctxt_mgr = mocker.MagicMock() + m_open_ctxt_mgr.__enter__.return_value = file_obj + m_open = mocker.patch("sud.config.open", return_value=m_open_ctxt_mgr) + c = Config("config.yml") + c.load() + m_open.assert_called_once_with("config.yml", "r") + m_safe_load.assert_called_once_with(file_obj) - safe_load.assert_called_once_with(fake_file_obj) assert c.hostname == config_file["hostname"] assert c.api_secret == config_file["api_secret"] assert isinstance(c.frequency, timedelta) @@ -54,10 +61,11 @@ def test_yaml_parse_error(mocker): "sud.config.yaml.safe_load", side_effect=yaml.YAMLError("parse error"), ) - fake_file_obj = mocker.MagicMock() + mocker.patch("sud.config.open") + c = Config("config.yml") with pytest.raises(SudException) as raised: - Config(fake_file_obj) + c.load() assert ( raised.value.message == "Invalid SUD configuration file: parse error"