diff --git a/datalad_next/credman/backend.py b/datalad_next/credman/backend.py new file mode 100644 index 00000000..a5fac965 --- /dev/null +++ b/datalad_next/credman/backend.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from typing import ( + Generator, +) +from .credential import ( + Credential, + Secret, +) + + +class CredentialBackend: + """Interface to be implemented for any CredentialManager backend""" + def __getitem__(self, identifier: str) -> Credential: + """Retrieve a particular credential + + The returned credentials contains all known properties and a secret + (if available). + + Parameters + ---------- + identifier: str + The identifier of the credential to be retrieved. + + Returns + ------- + dict + A mapping of credential properties to their values. The property key + of a credential's secret is 'secret'. + """ + raise NotImplementedError + + def __setitem__(self, identifier: str, value: Credential): + """Set a particular credential. + + Parameters + ---------- + identifier: str + The identifier of the credential to be deleted. + value: Credential + The credential to set. + """ + raise NotImplementedError + + def __delitem__(self, identifier: str): + """Delete a particular credential + + Parameters + ---------- + identifier: str + The identifier of the credential to be deleted. + """ + raise NotImplementedError + + def __iter__(self) -> Generator[str, None, None]: + """Yields the identifiers of all known credentials""" + raise NotImplementedError + + def __contains__(self, identifier: str) -> bool: + raise NotImplementedError + + def get_credential_type(self, type_id: str | None = None) -> Type[Credential]: + # the only recognized standard credentials is comprised (at minimum) + # of a 'secret' only. + # particular backends may support additional/different credentials. + # the reason why backends get a say in this at all is that they + # need to ensure that the "secret" (in whatever disguise it is + # presented) actually is put into a secret store -- which may + # be different from the place all other credential properties + # are stored. + # The credential manager is asking the backend on what information + # it needs to know, and how they should be labeled, and feeds + # the collected credential info in this format back to the backend. + return Secret diff --git a/datalad_next/credman/backend_gitcfg_keyring.py b/datalad_next/credman/backend_gitcfg_keyring.py new file mode 100644 index 00000000..aa5b342f --- /dev/null +++ b/datalad_next/credman/backend_gitcfg_keyring.py @@ -0,0 +1,221 @@ +from typing import ( + Dict, + Generator, + Type, +) + +from datalad_next.exceptions import ( + CapturedException, + CommandError, +) +from .backend import CredentialBackend +from .credential import ( + Credential, + Secret, + UserPassword, + Token, +) + +_credential_types = { + 'secret': Secret, + 'user_password': UserPassword, + 'token': Token, +} + + +class GitConfigKeyringBackend(CredentialBackend): + _cfg_section_prefix = 'datalad.credential.' + + def __init__(self, cfg=None): + """ + + Parameters + ---------- + cfg: ConfigManager, optional + If given, all configuration queries are performed using this + ``ConfigManager`` instance. Otherwise ``datalad.cfg`` is used. + """ + self._cfg = cfg + self.__keyring = None + + def get_credential_type( + self, type_id: str | None = None) -> Type[Credential]: + return _credential_types.get(type_id, Secret) + + def __getitem__(self, identifier: str) -> Credential: + var_prefix = self._get_cred_cfg_var(identifier, '') + # get any related info from config + cred_props = { + k[len(var_prefix):]: v + for k, v in self._cfg.items() + if k.startswith(var_prefix) + } + cred = self.get_credential_type(cred_props.get('type'))(**cred_props) + # be statisfied with a secret from the config + if cred.secret: + return cred + + # otherwise ask the secret store, always get the uniform + # 'secret' field + secret = self._keyring.get(identifier, 'secret') + + if not cred_props and not secret: + # no properties, no secret + raise KeyError(f'No credential with name {identifier!r}') + + cred.secret = secret + return cred + + def _get_credential_secret(self, identifier: str) -> str: + return self._keyring.get(identifier, 'secret') + + def __setitem__(self, identifier: str, value: Credential): + # update record, which properties did actually change? + # this is not always the same as the input, e.g. when + # a property would be _set_ at global scope, but is already + # defined at system scope + # TODO is this actually a good thing to do? What if the + # higher scope drops the setting and cripples a user setup? + # the feature-angle here is that an existing piece of + # information is not copied into the user-domain, where it + # is then fixed, and a system update cannot alter it without + # user intervention + updated = {} + + # remove props + # + remove_props = [ + k for k, v in properties.items() if v is None and k != 'secret'] + self._unset_credprops_anyscope(identifier, remove_props) + updated.update(**{k: None for k in remove_props}) + + # set non-secret props + # + set_props = { + k: v for k, v in properties.items() + if v is not None and k != 'secret' + } + for k, v in set_props.items(): + var = self._get_cred_cfg_var(identifier, k) + if self._cfg.get(var) == v: + # desired value already exists, we are not + # storing again to preserve the scope it + # was defined in + continue + # we always write to the global scope (ie. user config) + # credentials are typically a personal, not a repository + # specific entity -- likewise secrets go into a personal + # not repository-specific store + # for custom needs users can directly set the respective + # config + self._cfg.set(var, v, scope='global', force=True, reload=False) + updated[k] = v + if set_props: + # batch reload + self._cfg.reload() + + # at this point we will have a secret. it could be from ENV + # or provided, or entered. we always want to put it in the + # store + if 'secret' in properties: + # TODO at test for setting with secret=None as a property + # this would cause a credential without a secret. Is this possible? + # Is this desirable? If so, document and support explicitly. And + # add a test + self._keyring.set(identifier, 'secret', properties['secret']) + updated['secret'] = properties['secret'] + return updated + + def __delitem__(self, identifier: str): + # prefix for all config variables of this credential + prefix = self._get_cred_cfg_var(identifier, '') + + to_remove = [ + k[len(prefix):] for k in self._cfg.keys() + if k.startswith(prefix) + ] + if to_remove: + self._unset_credprops_anyscope(identifier, to_remove) + + # we always use the uniform 'secret' field + self._delete_keyring_field(identifier, 'secret') + + def __iter__(self) -> Generator[str, None, None]: + yield from self._get_known_credential_names() + + def __contains__(self, identifier: str) -> bool: + return identifier in self._get_known_credential_names() + + def _delete_keyring_field(self, record: str, field: str): + # delete the secret from the keystore, if there is any + try: + self._keyring.delete(record, field) + except Exception as e: + if self._keyring.get(record, field) is None: + # whatever it was, the target is reached + CapturedException(e) + else: + # we could not delete the field + raise # pragma: nocover + + @property + def _keyring(self): + """Returns the DataLad keyring wrapper + + This internal property may vanish whenever changes to the supported + backends are made. + """ + if self.__keyring: + return self.__keyring + from datalad.support.keyring_ import keyring + self.__keyring = keyring + return keyring + + def _get_known_credential_names(self) -> set: + known_credentials = set( + '.'.join(k.split('.')[2:-1]) for k in self._cfg.keys() + if k.startswith(self._cfg_section_prefix) + ) + return known_credentials + + def _unset_credprops_anyscope(self, name, keys): + """Reloads the config after unsetting all relevant variables + + This method does not modify the keystore. + """ + nonremoved_vars = [] + for k in keys: + var = self._get_cred_cfg_var(name, k) + if var not in self._cfg: + continue + try: + self._cfg.unset(var, scope='global', reload=False) + except CommandError as e: + CapturedException(e) + try: + self._cfg.unset(var, scope='local', reload=False) + except CommandError as e: + CapturedException(e) + nonremoved_vars.append(var) + if nonremoved_vars: + raise RuntimeError( + f"Cannot remove configuration items {nonremoved_vars} " + f"for credential, defined outside global or local " + "configuration scope. Remove manually") + self._cfg.reload() + + def _get_cred_cfg_var(self, name, prop): + """Return a config variable name for a credential property + + Parameters + ---------- + name : str + Credential name + prop : str + Property name + + Returns + ------- + str + """ + return f'{self._cfg_section_prefix}{name}.{prop}' diff --git a/datalad_next/credman/backend_gitcfg_keyring_legacy.py b/datalad_next/credman/backend_gitcfg_keyring_legacy.py new file mode 100644 index 00000000..7d2b582a --- /dev/null +++ b/datalad_next/credman/backend_gitcfg_keyring_legacy.py @@ -0,0 +1,101 @@ +from typing import ( + Dict, + Generator, +) + +from datalad_next.exceptions import ( + CapturedException, + CommandError, +) +from datalad_next.utils.credman_backend_gitcfg_keyring import ( + GitConfigKeyringBackend, +) + + +class GitConfigKeyringWithLegacySupportBackend(GitConfigKeyringBackend): + def get_credential(self, identifier): + # if we have a chance to query for stored legacy credentials + # we do this first to have the more modern parts of the + # system overwrite them reliably + cred = self._get_legacy_field_from_keyring(identifier, type) or {} + + # retrieve properties from config + cred.update(super().get_credential(identifier)) + + secret = self._cfg.get(self._get_cred_cfg_var(identifier, 'secret')) + + return cred + + def _get_secret_from_keyring(self, name, type_hint=None): + """ + Returns + ------- + str or None + None is return when no secret for the given credential name + could be found. Otherwise, the secret is returned. + """ + # always get the uniform + secret = self._keyring.get(name, 'secret') + if secret: + return secret + # fall back on a different "field" that is inferred from the + # credential type + secret_field = self._cred_types.get( + type_hint, {}).get('secret') + if not secret_field: + return + # first try to get it from the config to catch any overrides + secret = self._cfg.get(self._get_cred_cfg_var(name, secret_field)) + if secret is not None: + return secret + secret = self._keyring.get(name, secret_field) + return secret + + def delete_credential(self, + identifier: str, + type_hint: str | None = None) -> bool: + super().delete_credential(self, identifier) + + # remove legacy records too + for field in self._cred_types.get( + type_hint, {}).get('fields', []): + self._delete_keyring_field(identifier, field) + + def list_credentials(self) -> Generator[str, None, None]: + from datalad.downloaders.providers import Providers + + # we must deduplicate between modern and legacy credentials + # first modern ones + reported = set() + for name in super().list_credentials(): + if name not in reported: + yield name + reported.add(name) + + # and the legacy ones + for name in set( + p.credential.name + for p in Providers.from_config_files() + if p.credential + ): + if name not in reported: + yield name + reported.add(name) + + def _get_legacy_field_from_keyring(self, name, type_hint): + if not type_hint or type_hint not in self._cred_types: + return + + cred = {} + lc = self._cred_types[type_hint] + for field in (lc['fields'] or []): + if field == lc['secret']: + continue + val = self._keyring.get(name, field) + if val: + # legacy credentials used property names with underscores, + # but this is no longer syntax-compliant -- fix on read + cred[field.replace('_', '-')] = val + if 'type' not in cred: + cred['type'] = type_hint + return cred diff --git a/datalad_next/credman/credential.py b/datalad_next/credman/credential.py new file mode 100644 index 00000000..0293d81b --- /dev/null +++ b/datalad_next/credman/credential.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +from collections.abc import Mapping +from types import MappingProxyType +from typing import Generator + + +# this must implement enough of the dict API to work as a drop-in +# replacement. +# this is specifically not like the Credential class in datalad-core. +# this one does UI interactions, and interfaces operations with +# a secret store. here we only aim at providing a container for +# a credential. +# we make it a mapping, such that **credential works +class Credential(Mapping): + #_secret_field = must be a str + #_mandatory_props = must be a container like a tuple + def __init__(self, *args, **kwargs): + # _props is the true information store + # most of what this class does is filter information access to + # this dict + self._props = dict(*args, **kwargs) + self._props['type'] = self.type_label + + def __getitem__(self, key: str) -> str: + return self._props[key] + + def __setitem__(self, key: str, value: str | None): + if key == 'type' and self._props.get(key) != value: + raise KeyError( + 'Must not change credential type, create new instance instead') + self._props[key] = value + + def __delitem__(self, key: str): + del self._props[key] + + def __iter__(self) -> Generator[str, None, None]: + yield from self._props + + def __contains__(self, key: str) -> bool: + return key in self._props + + def __repr__(self) -> str: + return f'{self.__class__.__name__}({self._props.__repr__()})' + + def __len__(self) -> int: + return len(self._props) + + def get(self, key, default=None): + return self._props.get(key, default) + + @property + def secret(self) -> str: + return self._props.get(self._secret_field) + + @secret.setter + def secret(self, val: str): + self._props[self._secret_field] = val + + @property + def secret_fieldname(self): + return self._secret_field + + @property + def properties(self) -> MappingProxyType: + # read-only view + return MappingProxyType( + {k: v for k, v in self._props.items() if k != self._secret_field} + ) + + @property + def missing_properties(self) -> set: + # which properties are explicitly set to None + missing = set( + k for k, v in self._props.items() if v is None + ) + # which props are declared mandatory and have no value or do not + # exist at all + missing.update( + k for k in self._mandatory_props if not self._props.get(k) + ) + return missing + + @property + def type_label(self): + return self.__class__.__name__.lower() + + +class Secret(Credential): + # standard credential, a secret plus any credentials + _secret_field = 'secret' + _mandatory_props = tuple() + + +class UserPassword(Credential): + # standard credential, a secret plus any credentials + _secret_field = 'password' + _mandatory_props = ('user', 'password') + + @property + def type_label(self): + # name choice has historic reasons, this is what datalad-core + # did from its infancy + return 'user_password' + + +class Token(Credential): + _secret_field = 'token' + _mandatory_props = tuple() diff --git a/datalad_next/credman/manager.py b/datalad_next/credman/manager.py index f26b9d75..596795a5 100644 --- a/datalad_next/credman/manager.py +++ b/datalad_next/credman/manager.py @@ -26,16 +26,15 @@ ) import datalad -from datalad_next.exceptions import ( - CapturedException, - CommandError, -) + from datalad_next.uis import ui_switcher as ui +from .credential import Credential + lgr = logging.getLogger('datalad.credman') -class CredentialManager(object): +class CredentialManager: """Facility to get, set, remove and query credentials. A credential in this context is a set of properties (key-value pairs) @@ -88,12 +87,24 @@ def __init__(self, cfg=None): ``ConfigManager`` instance. Otherwise ``datalad.cfg`` is used. """ self.__cfg = cfg - self.__cred_types = None - self.__keyring = None + + # TODO this should move into a factory function + from .backend_gitcfg_keyring import ( + GitConfigKeyringBackend, + ) + self._backend = GitConfigKeyringBackend(cfg=self._cfg) + #from .backend_gitcfg_keyring_legacy import ( + # GitConfigKeyringWithLegacySupportBackend, + #) + #self._backend = GitConfigKeyringWithLegacySupportBackend(cfg=self._cfg) # main API # - def get(self, name=None, _prompt=None, _type_hint=None, **kwargs): + def get(self, + name=None, + _prompt=None, + _type_hint=None, + **kwargs) -> Credential: """Get properties and secret of a credential. This is a read-only method that never modifies information stored @@ -129,18 +140,19 @@ def get(self, name=None, _prompt=None, _type_hint=None, **kwargs): is used to determine a credential type, to possibly enable further lookup/entry of additional properties for a known credential type **kwargs: - Credential property name/value pairs. For any property with a value + Credential property name/value pairs to amend a retrieved credential, + or to populate a new credential. For any property with a value of ``None``, manual data entry will be performed, unless a value could be retrieved on lookup, or prompting was not enabled. Returns ------- - dict or None - Return ``None``, if no secret for the credential was found or entered. - Otherwise returns the complete credential record, comprising all - properties and the secret. An additional ``_edited`` key with a - value of ``True`` is added whenever the returned record contains - manually entered information. + Credential or None + Return ``None``, if no secret for the credential was found or + entered. Otherwise returns the complete credential record, + comprising all properties and the secret. An additional ``_edited`` + key with a value of ``True`` is added whenever the returned record + contains manually entered information. Raises ------ @@ -153,79 +165,57 @@ def get(self, name=None, _prompt=None, _type_hint=None, **kwargs): raise ValueError( 'CredentialManager.get() called without any identifying ' 'information') - if name is None: + + cred = None + if name: + # try retrieve + try: + cred = self._backend[name] + except KeyError: + if not kwargs and not _type_hint: + # no credential found, and nothing to work with + return + + if cred is None: # there is no chance we can retrieve any stored properties - # but we can prompt for some below - cred = {} - else: - # if we have a chance to query for stored legacy credentials - # we do this first to have the more modern parts of the - # system overwrite them reliably - cred = self._get_legacy_field_from_keyring( - name, kwargs.get('type', _type_hint)) or {} - - var_prefix = _get_cred_cfg_var(name, '') - # get related info from config - cred.update({ - k[len(var_prefix):]: v - for k, v in self._cfg.items() - if k.startswith(var_prefix) - }) - - # final word on the credential type - _type_hint = cred.get('type', kwargs.get('type', _type_hint)) - if _type_hint: - # import the definition of expected fields from the known - # credential types - cred_type = self._cred_types.get( - _type_hint, - dict(fields=[], secret=None)) - for k in (cred_type['fields'] or []): - if k == cred_type['secret'] or k in kwargs: - # do nothing, if this is the secret key - # or if we have an incoming value for this key already - continue - # otherwise make sure we prompt for the essential - # fields - kwargs[k] = None + # but we can prompt for some below. + # ask the backend for a credential template of the given + # type(hint) + cred = self._backend.get_credential_type( + kwargs.get('type', _type_hint))() - prompted = False + # update the credential with given properties (or askme flag) for k, v in kwargs.items(): - if k == 'secret': - # handled below - continue - if _prompt and v is None and cred.get(k) is None: - # ask if enabled, no value was provided, - # and no value is already on record + # only update if a value was provided (None is a flag for + # saying "prompt for this, unless you have it already"), + # or if no value is set in the credential + if v is not None or not cred.get(k): + cred[k] = v + + # prompt for missing values, if enabled, no value was provided, + # and no value is already on record + prompted = False + if _prompt: + for k in cred.missing_properties: v = self._ask_property(k, None if prompted else _prompt) if v is not None: prompted = True if v: cred[k] = v - # start locating the secret at the method parameters - secret = kwargs.get('secret') - if secret is None and name: - # get the secret, from the effective config, not just the keystore - secret = self._get_secret(name, type_hint=_type_hint) - if _prompt and secret is None: + # start locating the secret at the method parameters, and then in the + # retrieved credential + if _prompt and cred.secret is None: secret = self._ask_secret( - type_hint=self._cred_types.get( - _type_hint, {}).get('secret'), + type_hint=cred.secret_fieldname, prompt=None if prompted else _prompt, ) if secret: prompted = True + cred.secret = secret - if secret: - cred['secret'] = secret - - if 'type' not in cred and kwargs.get('type'): - # enhance legacy credentials - cred['type'] = kwargs.get('type') - - if not cred: - # if there is absolutely nothing to report, report None + if cred.missing_properties: + # if the credential is incomplete report None return # report whether there were any edits to the credential record @@ -299,10 +289,10 @@ def set(self, ValueError When property names in kwargs are not syntax-compliant. """ - updated = {} + entered_name = None if not name: - known_credentials = self._get_known_credential_names() + known_credentials = self._backend if _suggested_name in known_credentials: # ignore name suggestion, conflicts with existing credential _suggested_name = None @@ -334,7 +324,7 @@ def set(self, 'exists, please provide a different name.') else: name = entered - updated['name'] = name + entered_name = name # we strip internal properties, such as '_edited' automatically # forcing each caller to to this by hand is kinda pointless, if @@ -342,79 +332,55 @@ def set(self, # would include one for any credentials that was manually entered kwargs = {k: v for k, v in kwargs.items() if not k.startswith('_')} # check syntax for the rest + # TODO should the backend do this? verify_property_names(kwargs) # if we know the type, hence we can do a query for legacy secrets # and properties. This will migrate them to the new setup # over time type_hint = kwargs.get('type') - cred = self._get_legacy_field_from_keyring(name, type_hint) or {} - # amend with given properties - cred.update(**kwargs) - # update last-used, if requested - if _lastused: - cred['last-used'] = datetime.now().isoformat() - - # remove props - # - remove_props = [ - k for k, v in cred.items() if v is None and k != 'secret'] - self._unset_credprops_anyscope(name, remove_props) - updated.update(**{k: None for k in remove_props}) - - # set non-secret props - # - set_props = { - k: v for k, v in cred.items() - if v is not None and k != 'secret' - } - for k, v in set_props.items(): - var = _get_cred_cfg_var(name, k) - if self._cfg.get(var) == v: - # desired value already exists, we are not - # storing again to preserve the scope it - # was defined in - continue - # we always write to the global scope (ie. user config) - # credentials are typically a personal, not a repository - # specific entity -- likewise secrets go into a personal - # not repository-specific store - # for custom needs users can directly set the respective - # config - self._cfg.set(var, v, scope='global', force=True, reload=False) - updated[k] = v - if set_props: - self._cfg.reload() - - # set secret - # - # we aim to update the secret in the store, hence we must - # query for a previous setting in order to reliably report - # updates - prev_secret = self._get_secret_from_keyring(name, type_hint) - if 'secret' not in cred: - # we have no removal directive, reuse previous secret - cred['secret'] = prev_secret - if cred.get('secret') is None: - # we want to reset the secret, consider active config - cred['secret'] = \ - self._cfg.get(_get_cred_cfg_var(name, 'secret')) - - if cred.get('secret') is None: - # we have no secret specified or in the store already: ask + # retrieve any existing information on this credential, this will also + # pull out a secret from a store. We aim to write a secret to the + # store, so the possibly triggered unlocking of a secret store + # is not "extra cost" + cred = self._backend[name] + # amend/overwrite with given properties + to_update = kwargs + + # ensure we have a secret + if cred.get('secret') is None and to_update.get('secret') is None: + # we have no secret specified or in the store already: ask. # typically we would end up here with an explicit attempt # to set a credential in a context that is known to an # interactive user, hence the messaging here can be simple - cred['secret'] = self._ask_secret( + to_update['secret'] = self._ask_secret( CredentialManager.secret_names.get(type_hint, 'secret')) - # at this point we will have a secret. it could be from ENV - # or provided, or entered. we always want to put it in the - # store - self._keyring.set(name, 'secret', cred['secret']) - if cred['secret'] != prev_secret: - # only report updated if actually different from before - updated['secret'] = cred['secret'] + + # update last-used, if requested + if _lastused: + to_update['last-used'] = datetime.now().isoformat() + + self._backend[name] = to_update + updated = _get_credential_updatesself._backend[name] + if entered_name: + updated['name'] = entered_name return updated + def _update_credential(self, name: str, updates: Dict) -> Dict: + prev = self._backend[name] + update + # any properties that is no longer present is indicated with + # a None value + updates = { + p: None for p in prev if p not in curr + } + # anything that differs from its prev value or is new is reported + # as it is now + updates.update({ + k: v for k, v in curr.items() if k not in prev or prev[k] != v + }) + return updates + + def remove(self, name, type_hint=None): """Remove a credential, including all properties and secret @@ -437,39 +403,15 @@ def remove(self, name, type_hint=None): successfully. Likely cause is that it is defined in a configuration scope or backend for which write-access is not supported. """ - # prefix for all config variables of this credential - prefix = _get_cred_cfg_var(name, '') - - to_remove = [ - k[len(prefix):] for k in self._cfg.keys() - if k.startswith(prefix) - ] - removed = False - if to_remove: - self._unset_credprops_anyscope(name, to_remove) - removed = True - - # delete the secret from the keystore, if there is any - def del_field(name, field): - global removed - try: - self._keyring.delete(name, field) - removed = True - except Exception as e: - if self._keyring.get(name, field) is None: - # whatever it was, the target is reached - CapturedException(e) - else: - # we could not delete the field - raise # pragma: nocover - - del_field(name, 'secret') - if type_hint: - # remove legacy records too - for field in self._cred_types.get( - type_hint, {}).get('fields', []): - del_field(name, field) - return removed + # TODO the actual backend API does not support a type_hint parameter + # for delete. This is only left in here, because legacy DataLad + # credentials can only be cleaned up when their type is known. + del self._backend[name] + # TODO backend does not support this, and it also has no real usecase + # if a credential is to be remove, but also does not exist to begin + # with, the outcome is identical to a successful removal of an existing + # credential. Discontinue this behavior + return True def query_(self, **kwargs): """Query for all (matching) credentials. @@ -497,15 +439,11 @@ def query_(self, **kwargs): matching credential. """ done = set() - known_credentials = set((n, None) for n in self._get_known_credential_names()) - from itertools import chain - for name, type_hint in chain( - _yield_legacy_credential_names(), - known_credentials): + for name in self._backend: if name in done: continue done.add(name) - cred = self.get(name, _prompt=None, _type_hint=type_hint) + cred = self.get(name, _prompt=None) if not cred: continue if not kwargs: @@ -673,19 +611,23 @@ def obtain(self, ) try: cred = self.get(**kwargs) - # check if we know the realm, if so include in the credential, - realm = (query_props or {}).get('realm') - if realm: - cred['realm'] = realm - if name is None and type_hint: - # we are not expecting to retrieve a particular credential. - # make the type hint the actual type of the credential - cred['type'] = type_hint except Exception as e: - lgr.debug('Obtaining credential failed: %s', e) + raise ValueError( + 'No suitable credential found or specified') from e if not cred: raise ValueError('No suitable credential found or specified') + + if name is None and type_hint and 'type' not in cred: + # we were not expecting to retrieve a particular credential. + # make the type hint the actual type of the credential + cred['type'] = type_hint + + # check if we know the realm, if so include in the credential, + realm = (query_props or {}).get('realm') + if realm: + cred['realm'] = realm + missing_props = [ ep for ep in (expected_props or []) if ep not in cred ] @@ -697,12 +639,13 @@ def obtain(self, # internal helpers # - def _get_known_credential_names(self) -> set: - known_credentials = set( - '.'.join(k.split('.')[2:-1]) for k in self._cfg.keys() - if k.startswith('datalad.credential.') - ) - return known_credentials + @property + def _cfg(self): + """Return a ConfigManager given to __init__() or the global datalad.cfg + """ + if self.__cfg: + return self.__cfg + return datalad.cfg def _ask_property(self, name, prompt=None): if not ui.is_interactive: @@ -725,102 +668,6 @@ def _ask_secret(self, type_hint=None, prompt=None): 'datalad.credentials.hidden-secret-entry'), ) - def _unset_credprops_anyscope(self, name, keys): - """Reloads the config after unsetting all relevant variables - - This method does not modify the keystore. - """ - nonremoved_vars = [] - for k in keys: - var = _get_cred_cfg_var(name, k) - if var not in self._cfg: - continue - try: - self._cfg.unset(var, scope='global', reload=False) - except CommandError as e: - CapturedException(e) - try: - self._cfg.unset(var, scope='local', reload=False) - except CommandError as e: - CapturedException(e) - nonremoved_vars.append(var) - if nonremoved_vars: - raise RuntimeError( - f"Cannot remove configuration items {nonremoved_vars} " - f"for credential, defined outside global or local " - "configuration scope. Remove manually") - self._cfg.reload() - - def _get_legacy_field_from_keyring(self, name, type_hint): - if not type_hint or type_hint not in self._cred_types: - return - - cred = {} - lc = self._cred_types[type_hint] - for field in (lc['fields'] or []): - if field == lc['secret']: - continue - val = self._keyring.get(name, field) - if val: - # legacy credentials used property names with underscores, - # but this is no longer syntax-compliant -- fix on read - cred[field.replace('_', '-')] = val - if 'type' not in cred: - cred['type'] = type_hint - return cred - - def _get_secret(self, name, type_hint=None): - secret = self._cfg.get(_get_cred_cfg_var(name, 'secret')) - if secret is not None: - return secret - return self._get_secret_from_keyring(name, type_hint) - - def _get_secret_from_keyring(self, name, type_hint=None): - """ - Returns - ------- - str or None - None is return when no secret for the given credential name - could be found. Otherwise, the secret is returned. - """ - # always get the uniform - secret = self._keyring.get(name, 'secret') - if secret: - return secret - # fall back on a different "field" that is inferred from the - # credential type - secret_field = self._cred_types.get( - type_hint, {}).get('secret') - if not secret_field: - return - # first try to get it from the config to catch any overrides - secret = self._cfg.get(_get_cred_cfg_var(name, secret_field)) - if secret is not None: - return secret - secret = self._keyring.get(name, secret_field) - return secret - - @property - def _cfg(self): - """Return a ConfigManager given to __init__() or the global datalad.cfg - """ - if self.__cfg: - return self.__cfg - return datalad.cfg - - @property - def _keyring(self): - """Returns the DataLad keyring wrapper - - This internal property may vanish whenever changes to the supported - backends are made. - """ - if self.__keyring: - return self.__keyring - from datalad.support.keyring_ import keyring - self.__keyring = keyring - return keyring - @property def _cred_types(self): """Internal property for mapping of credential type names to fields. @@ -874,24 +721,6 @@ def _cred_types(self): return mapping -def _yield_legacy_credential_names(): - # query is constrained by non-secrets, no constraints means report all - # a constraint means *exact* match on all given properties - from datalad.downloaders.providers import ( - Providers, - CREDENTIAL_TYPES, - ) - type_hints = {v: k for k, v in CREDENTIAL_TYPES.items()} - - legacy_credentials = set( - (p.credential.name, type(p.credential)) - for p in Providers.from_config_files() - if p.credential - ) - for name, type_ in legacy_credentials: - yield (name, type_hints.get(type_)) - - def verify_property_names(names): """Check credential property names for syntax-compliance. @@ -912,20 +741,3 @@ def verify_property_names(names): raise ValueError( f'Unsupported property names {invalid_names}, ' 'must match git-config variable syntax (a-z0-9 and - characters)') - - -def _get_cred_cfg_var(name, prop): - """Return a config variable name for a credential property - - Parameters - ---------- - name : str - Credential name - prop : str - Property name - - Returns - ------- - str - """ - return f'datalad.credential.{name}.{prop}' diff --git a/datalad_next/credman/tests/test_credman.py b/datalad_next/credman/tests/test_credman.py index 1cd16615..faf5accf 100644 --- a/datalad_next/credman/tests/test_credman.py +++ b/datalad_next/credman/tests/test_credman.py @@ -15,7 +15,11 @@ from datalad.config import ConfigManager from ..manager import ( CredentialManager, - _get_cred_cfg_var, + #_get_cred_cfg_var, +) +from ..credential import ( + Secret, + UserPassword, ) from datalad_next.tests.utils import ( MemoryKeyring, @@ -45,19 +49,18 @@ def check_credmanager(): # but if there is anything, report it # this makes it possible to discover credential fragments, if only to # expose them for clean-up - eq_(credman.get(crazy='empty'), {'crazy': 'empty'}) + eq_(credman.get(crazy='empty'), Secret(crazy='empty')) # smoke test for legacy credential retrieval code # reporting back a credential, even if empty, exposes the legacy # credentials (by name), and enables discovery and (re)setting them # using this newer credential system - eq_(credman.get('donotexiststest', type='user_password'), - {'type': 'user_password'}) + eq_(credman.get('donotexiststest', type='user_password'), None) # does not fiddle with a secret that is readily provided eq_(credman.get('dummy', secret='mike', _type_hint='token'), dict(type='token', secret='mike')) # remove a secret that has not yet been set - eq_(credman.remove('elusive', type_hint='user_password'), False) + credman.remove('elusive', type_hint='user_password') # but the secret was written to the keystore with pytest.raises(ValueError): @@ -80,17 +83,20 @@ def check_credmanager(): # second, no changing the secret, but changing the prop, albeit with # the same value, change report should be empty eq_(credman.set('changed', prop='val'), dict()) - # change secret, with value pulled from config - try: - cfg.set('datalad.credential.changed.secret', 'envsec', - scope='override') - eq_(credman.set('changed', secret=None), dict(secret='envsec')) - finally: - cfg.unset('datalad.credential.changed.secret', scope='override') + # MIH is disabling this test and the associated behavior + # it is very confusing to say `secret=None` -- which has + # elsewhere the semantics of "delete this properties", but as a result + # get a value from a configuration items stored in the secret store + #try: + # cfg.set('datalad.credential.changed.secret', 'envsec', + # scope='override') + # eq_(credman.set('changed', secret=None), dict(secret='envsec')) + #finally: + # cfg.unset('datalad.credential.changed.secret', scope='override') # remove non-existing property, secret not report, because unchanged eq_(credman.set('mycred', dummy=None), dict(dummy=None)) - assert_not_in(_get_cred_cfg_var("mycred", "dummy"), cfg) + #assert_not_in(_get_cred_cfg_var("mycred", "dummy"), cfg) # set property eq_(credman.set('mycred', dummy='good', this='that'), @@ -107,7 +113,7 @@ def check_credmanager(): # test full query and constrained query q = list(credman.query_()) # 3 defined here, plus any number of legacy credentials - assert len(q) > 3 + assert len(q) >= 3 # now query for one of the creds created above q = list(credman.query_(prop='val')) eq_(len(q), 1)