diff --git a/cscs_keygen/credentials_helper.py b/cscs_keygen/credentials_helper.py index 5768132..dd1b9aa 100644 --- a/cscs_keygen/credentials_helper.py +++ b/cscs_keygen/credentials_helper.py @@ -5,12 +5,15 @@ import json import os import re +import sys from abc import ABC, abstractmethod +from pathlib import Path from typing import Dict, NoReturn from attr import define, field, validators -from cscs_keygen.utils import run_command +from cscs_keygen.logger import logger +from cscs_keygen.utils import get_command_path, run_command class CredentialsHelperError(Exception): @@ -24,17 +27,30 @@ class CredsHelper(ABC): backend: str = field(validator=validators.in_(["bw", "op", "1p"])) backend_token: str = field(init=False) backend_name: str = field(init=False) + backend_cli: Path = field(init=False) item_name: str = "" _is_unlocked: bool = False __credentials: dict = field(factory=dict, init=False) def __attrs_post_init__(self) -> None: if self.backend == "bw": + # Backend is Bitwarden self.backend_token = "BW_SESSION" self.backend_name = "Bitwarden" - elif self.backend in ["op", "1p"]: + cli_name = "bw" + else: + # Must be 1Password self.backend_token = "OP_SERVICE_ACCOUNT_TOKEN" self.backend_name = "1Password" + cli_name = "op" + + if not (cli_path := get_command_path(cli_name)): + logger.error( + f"{self.backend_name} CLI not found in PATH. Please install it." + ) + sys.exit(1) + + self.backend_cli = cli_path @property def credentials(self) -> Dict[str, str]: @@ -100,7 +116,10 @@ def fetch_credentials(self) -> Dict[str, str]: for __field in ("username", "password", "totp"): self.credentials[__field] = str( - run_command(f'bw get {__field} "{self.item_name}" --raw', text=True) + run_command( + f'{self.backend_cli} get {__field} "{self.item_name}" --raw', + text=True, + ) ).strip() return self.credentials @@ -133,7 +152,8 @@ def fetch_credentials(self) -> Dict[str, str]: creds = json.loads( str( run_command( - f'op item get "{self.item_name}" --format json --fields label=username,password,totp', + f'{self.backend_cli} item get "{self.item_name}" ' + "--format json --fields label=username,password,totp", text=True, ) ) diff --git a/cscs_keygen/logger.py b/cscs_keygen/logger.py index a53fd33..3ac7abe 100644 --- a/cscs_keygen/logger.py +++ b/cscs_keygen/logger.py @@ -100,7 +100,7 @@ def warning(self, message: str) -> None: ) @BaseLogger.should_log(LogLevel.ERROR) - def error(self, message: str) -> None: + def error(self, message: str, exc: Optional[Exception] = None) -> None: """Log an error message.""" self.console.print( self._get_timestamp(), @@ -108,6 +108,12 @@ def error(self, message: str) -> None: Text(message, style="error"), ) + if exc: + self.console.print( + Text(" ↳ ", style="error"), + Text(f"{exc.__class__.__name__}: {exc!s}", style="error"), + ) + @BaseLogger.should_log(LogLevel.DEBUG) def debug(self, message: str) -> None: """Log a debug message.""" diff --git a/cscs_keygen/utils.py b/cscs_keygen/utils.py index fb63586..dced215 100644 --- a/cscs_keygen/utils.py +++ b/cscs_keygen/utils.py @@ -3,8 +3,10 @@ """ import shlex +import shutil import subprocess as sp import sys +from pathlib import Path from typing import Optional import requests @@ -91,3 +93,10 @@ def get_keys_from_api( return key_response.private, key_response.public return None, None + + +def get_command_path(command: str) -> Optional[Path]: + """Check if a command is available in the PATH""" + if cmd := shutil.which(command): + return Path(cmd) + return None