diff --git a/datalad_next/commands/credentials.py b/datalad_next/commands/credentials.py index 18eb9b97..1b4ebea1 100644 --- a/datalad_next/commands/credentials.py +++ b/datalad_next/commands/credentials.py @@ -384,6 +384,8 @@ def custom_result_renderer(res, **kwargs): '✓' if res.get('cred_secret') else '✗', ) + if res.pop('from_backend', None) == 'legacy': + res['type'] = 'legacy-credential' if 'message' not in res: # give the names of all properties # but avoid duplicating the type, hide the prefix, diff --git a/datalad_next/credman/manager.py b/datalad_next/credman/manager.py index efd8c77b..72d8a3ee 100644 --- a/datalad_next/credman/manager.py +++ b/datalad_next/credman/manager.py @@ -161,16 +161,36 @@ def get(self, name=None, *, _prompt=None, _type_hint=None, **kwargs): # if we have a chance to query for stored legacy credentials # we do this first to have the more modern parts of the # system overwrite them reliably - cred = self._get_legacy_field_from_keyring( - name, kwargs.get('type', _type_hint)) or {} - + _type_hint = kwargs.get('type', _type_hint) + if not _type_hint: + # no type hint given in any form. Last chance is that + # this is a known legacy credential. + # doing this query is a bit expensive, but getting a + # credential is not a high-performance procedure, and + # the gain in convenience is substantial -- otherwise + # users would need to somehow know what they should be + # looking for + _type_hint = dict(_yield_legacy_credential_types()).get(name) + cred = self._get_legacy_credential_from_keyring( + name, _type_hint) or {} + if cred: + # at least some info came from the legacy backend, record that + cred['_from_backend'] = 'legacy' + + # and now take whatever we got from the legacy store and update + # it from what we have in the config var_prefix = _get_cred_cfg_var(name, '') - # get related info from config - cred.update({ + cred_update = { k[len(var_prefix):]: v for k, v in self._cfg.items() if k.startswith(var_prefix) - }) + } + if set(cred_update.keys()) >= set( + k for k in cred.keys() if k != '_from_backend'): + # we are overwriting all info from a possible legacy credential + # take marker off + cred.pop('_from_backend', None) + cred.update(cred_update) # final word on the credential type _type_hint = cred.get('type', kwargs.get('type', _type_hint)) @@ -220,14 +240,14 @@ def get(self, name=None, *, _prompt=None, _type_hint=None, **kwargs): if secret: cred['secret'] = secret - if 'type' not in cred and kwargs.get('type'): - # enhance legacy credentials - cred['type'] = kwargs.get('type') - if not cred: # if there is absolutely nothing to report, report None return + if 'type' not in cred and _type_hint: + # make sure we always report a type whenever we have any clue + cred['type'] = _type_hint + # report whether there were any edits to the credential record # (incl. being entirely new), such that consumers can decide # to save a credentials, once battle-tested @@ -348,7 +368,7 @@ def set(self, # and properties. This will migrate them to the new setup # over time type_hint = kwargs.get('type') - cred = self._get_legacy_field_from_keyring(name, type_hint) or {} + cred = self._get_legacy_credential_from_keyring(name, type_hint) or {} # amend with given properties cred.update(**kwargs) # update last-used, if requested @@ -500,15 +520,21 @@ def query_(self, **kwargs): done = set() known_credentials = set((n, None) for n in self._get_known_credential_names()) from itertools import chain - for name, type_hint in chain( - _yield_legacy_credential_names(), + for name, legacy_type_hint in chain( + _yield_legacy_credential_types(), known_credentials): - if name in done: - continue done.add(name) - cred = self.get(name, _prompt=None, _type_hint=type_hint) - if not cred: - continue + cred = self.get(name, _prompt=None, _type_hint=legacy_type_hint) + if not cred and legacy_type_hint: + # this legacy-type credential is not set. We still want to + # report on it, because it is the only way for users that + # discover these predefined credential "slots" + cred = dict(type=legacy_type_hint) + if legacy_type_hint is not None: + # leading underscore to distinguish this internal marker from + # an actual credential property. + # the credentials command will then also treat it as such + cred['_from_backend'] = 'legacy' if not kwargs: yield (name, cred) else: @@ -752,7 +778,15 @@ def _unset_credprops_anyscope(self, name, keys): "configuration scope. Remove manually") self._cfg.reload() - def _get_legacy_field_from_keyring(self, name, type_hint): + def _get_legacy_credential_from_keyring( + self, + name: str, + type_hint: str, + ) -> Dict | None: + """With a ``type_hint`` given, attempts to retrieve a credential + comprised of all fields defined in ``self._cred_types``. Otherwise + ``None`` is returned. + """ if not type_hint or type_hint not in self._cred_types: return @@ -766,9 +800,12 @@ def _get_legacy_field_from_keyring(self, name, type_hint): # legacy credentials used property names with underscores, # but this is no longer syntax-compliant -- fix on read cred[field.replace('_', '-')] = val - if 'type' not in cred: - cred['type'] = type_hint - return cred + + if not cred: + # there is nothing on a legacy credential with this name + return None + else: + return cred def _get_secret(self, name, type_hint=None): secret = self._cfg.get(_get_cred_cfg_var(name, 'secret')) @@ -875,7 +912,7 @@ def _cred_types(self): return mapping -def _yield_legacy_credential_names(): +def _yield_legacy_credential_types(): # query is constrained by non-secrets, no constraints means report all # a constraint means *exact* match on all given properties from datalad.downloaders.providers import ( @@ -884,9 +921,15 @@ def _yield_legacy_credential_names(): ) type_hints = {v: k for k, v in CREDENTIAL_TYPES.items()} + # ATTN: from Providers.from_config_files() is sensitive to the PWD + # it will only read legacy credentials from datasets whenever + # PWD is inside a dataset legacy_credentials = set( (p.credential.name, type(p.credential)) - for p in Providers.from_config_files() + # without reload, no changes in files since the last call + # will be considered. That last call might have happended + # in datalad-core, and may have been in another directory + for p in Providers.from_config_files(reload=True) if p.credential ) for name, type_ in legacy_credentials: diff --git a/datalad_next/credman/tests/test_credman.py b/datalad_next/credman/tests/test_credman.py index 1cd16615..0e382b7c 100644 --- a/datalad_next/credman/tests/test_credman.py +++ b/datalad_next/credman/tests/test_credman.py @@ -28,6 +28,7 @@ with_testsui, ) from datalad_next.datasets import Dataset +from datalad_next.utils import chpwd def test_credmanager(): @@ -46,12 +47,6 @@ def check_credmanager(): # this makes it possible to discover credential fragments, if only to # expose them for clean-up eq_(credman.get(crazy='empty'), {'crazy': 'empty'}) - # smoke test for legacy credential retrieval code - # reporting back a credential, even if empty, exposes the legacy - # credentials (by name), and enables discovery and (re)setting them - # using this newer credential system - eq_(credman.get('donotexiststest', type='user_password'), - {'type': 'user_password'}) # does not fiddle with a secret that is readily provided eq_(credman.get('dummy', secret='mike', _type_hint='token'), dict(type='token', secret='mike')) @@ -285,3 +280,98 @@ def check_obtain(): assert res == (None, {'type': 'token', 'secret': 'mynewtoken', '_edited': True, 'realm': 'mytotallynewrealm'}) + + +legacy_provider_cfg = """\ +[provider:credmanuniquetestcredentialsetup] +url_re = http://example\\.com/ +authentication_type = http_basic_auth +credential = credmanuniquetestcredentialsetup + +[credential:credmanuniquetestcredentialsetup] +type = user_password +""" + + +def test_legacy_credentials(tmp_path): + # we will use a dataset to host a legacy provider config + ds = Dataset(tmp_path).create(result_renderer='disabled') + provider_path = ds.pathobj / '.datalad' / 'providers' / 'mylegacycred.cfg' + provider_path.parent.mkdir(parents=True, exist_ok=True) + provider_path.write_text(legacy_provider_cfg) + # - the legacy code will only ever pick up a dataset credential, when + # PWD is inside a dataset + # - we want all tests to bypass the actual system keyring + # 'datalad.downloaders.credentials.keyring_' is what the UserPassword + # credential will use to store the credential + # - 'datalad.support.keyring_' is what credman uses + # - we need to make them one and the same thing + tmp_keyring = MemoryKeyring() + with chpwd(tmp_path), \ + patch('datalad.downloaders.credentials.keyring_', tmp_keyring), \ + patch('datalad.support.keyring_.keyring', tmp_keyring): + check_legacy_credentials(ds) + + +def check_legacy_credentials(ds): + # shortcut + cname = 'credmanuniquetestcredentialsetup' + + credman = CredentialManager(ds.config) + # check that we get legacy reports in a query. this is needed to be + # able to even know that they exist + res = dict(credman.query()) + assert cname in res + cred = res[cname] + # we always get the type reported + assert cred['type'] == 'user_password' + # we can know that it is a legacy credential + assert cred['_from_backend'] == 'legacy' + # but getting an unset legacy credential will unambiguously say + # "there is none" + assert credman.get(cname) is None + + # we want all tests to bypass the actual system keyring + # 'datalad.downloaders.credentials.keyring_' is what the UserPassword + # credential will use to store the credential + from datalad.downloaders.credentials import UserPassword + lc = UserPassword(cname, dataset=ds) + lc.set(user='mike', password='pass123') + # now we should be able to get it from credman too + # and just by name -- no need to provide a type hint + cred = credman.get(cname) + assert cred['user'] == 'mike' + # reporting of the secret is always under the 'secret' key + assert cred['secret'] == 'pass123' + assert cred['type'] == 'user_password' + assert cred['_from_backend'] == 'legacy' + + # check migration on set + try: + # setting a credential, will migrate info into the non-legacy + # backend. however, it will not move information _out of_ + # the legacy backend, in order to keep old code working + # with the old info + # confirm starting point: legacy code keeps user in secret store + assert credman._keyring.entries[cname]['user'] == 'mike' + assert ds.config.get(f'datalad.credential.{cname}.user') is None + credman.set(cname, **cred) + # it remains there + assert credman._keyring.entries[cname]['user'] == 'mike' + # but is also migrated + assert ds.config.get(f'datalad.credential.{cname}.user') == 'mike' + # NOTE: This setup is not without problems. Users will update + # a credential and will leave an outdated (half) as garbage. + # however, I did not come up with a better approach that gradually + # brings users over. + credman.set(cname, user='newmike', secret='othersecret') + assert credman._keyring.entries[cname]['user'] == 'mike' + # finally check that the update is reported now + cred = credman.get(cname) + assert cred['user'] == 'newmike' + assert cred['secret'] == 'othersecret' + assert cred['type'] == 'user_password' + # no legacy info makes it out, hence no marker + assert cred.get('_from_backend') != 'legacy' + finally: + credman.remove(cname)