Skip to content

Commit

Permalink
Merge pull request #247 from mih/legacycred
Browse files Browse the repository at this point in the history
Improve legacy credential handling (and test it)
  • Loading branch information
mih authored Feb 18, 2023
2 parents 069a811 + e25ce5e commit 3e10738
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 30 deletions.
2 changes: 2 additions & 0 deletions datalad_next/commands/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ def custom_result_renderer(res, **kwargs):
'✓' if res.get('cred_secret') else '✗',
)

if res.pop('from_backend', None) == 'legacy':
res['type'] = 'legacy-credential'
if 'message' not in res:
# give the names of all properties
# but avoid duplicating the type, hide the prefix,
Expand Down
91 changes: 67 additions & 24 deletions datalad_next/credman/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,36 @@ def get(self, name=None, *, _prompt=None, _type_hint=None, **kwargs):
# if we have a chance to query for stored legacy credentials
# we do this first to have the more modern parts of the
# system overwrite them reliably
cred = self._get_legacy_field_from_keyring(
name, kwargs.get('type', _type_hint)) or {}

_type_hint = kwargs.get('type', _type_hint)
if not _type_hint:
# no type hint given in any form. Last chance is that
# this is a known legacy credential.
# doing this query is a bit expensive, but getting a
# credential is not a high-performance procedure, and
# the gain in convenience is substantial -- otherwise
# users would need to somehow know what they should be
# looking for
_type_hint = dict(_yield_legacy_credential_types()).get(name)
cred = self._get_legacy_credential_from_keyring(
name, _type_hint) or {}
if cred:
# at least some info came from the legacy backend, record that
cred['_from_backend'] = 'legacy'

# and now take whatever we got from the legacy store and update
# it from what we have in the config
var_prefix = _get_cred_cfg_var(name, '')
# get related info from config
cred.update({
cred_update = {
k[len(var_prefix):]: v
for k, v in self._cfg.items()
if k.startswith(var_prefix)
})
}
if set(cred_update.keys()) >= set(
k for k in cred.keys() if k != '_from_backend'):
# we are overwriting all info from a possible legacy credential
# take marker off
cred.pop('_from_backend', None)
cred.update(cred_update)

# final word on the credential type
_type_hint = cred.get('type', kwargs.get('type', _type_hint))
Expand Down Expand Up @@ -220,14 +240,14 @@ def get(self, name=None, *, _prompt=None, _type_hint=None, **kwargs):
if secret:
cred['secret'] = secret

if 'type' not in cred and kwargs.get('type'):
# enhance legacy credentials
cred['type'] = kwargs.get('type')

if not cred:
# if there is absolutely nothing to report, report None
return

if 'type' not in cred and _type_hint:
# make sure we always report a type whenever we have any clue
cred['type'] = _type_hint

# report whether there were any edits to the credential record
# (incl. being entirely new), such that consumers can decide
# to save a credentials, once battle-tested
Expand Down Expand Up @@ -348,7 +368,7 @@ def set(self,
# and properties. This will migrate them to the new setup
# over time
type_hint = kwargs.get('type')
cred = self._get_legacy_field_from_keyring(name, type_hint) or {}
cred = self._get_legacy_credential_from_keyring(name, type_hint) or {}
# amend with given properties
cred.update(**kwargs)
# update last-used, if requested
Expand Down Expand Up @@ -500,15 +520,21 @@ def query_(self, **kwargs):
done = set()
known_credentials = set((n, None) for n in self._get_known_credential_names())
from itertools import chain
for name, type_hint in chain(
_yield_legacy_credential_names(),
for name, legacy_type_hint in chain(
_yield_legacy_credential_types(),
known_credentials):
if name in done:
continue
done.add(name)
cred = self.get(name, _prompt=None, _type_hint=type_hint)
if not cred:
continue
cred = self.get(name, _prompt=None, _type_hint=legacy_type_hint)
if not cred and legacy_type_hint:
# this legacy-type credential is not set. We still want to
# report on it, because it is the only way for users that
# discover these predefined credential "slots"
cred = dict(type=legacy_type_hint)
if legacy_type_hint is not None:
# leading underscore to distinguish this internal marker from
# an actual credential property.
# the credentials command will then also treat it as such
cred['_from_backend'] = 'legacy'
if not kwargs:
yield (name, cred)
else:
Expand Down Expand Up @@ -752,7 +778,15 @@ def _unset_credprops_anyscope(self, name, keys):
"configuration scope. Remove manually")
self._cfg.reload()

def _get_legacy_field_from_keyring(self, name, type_hint):
def _get_legacy_credential_from_keyring(
self,
name: str,
type_hint: str,
) -> Dict | None:
"""With a ``type_hint`` given, attempts to retrieve a credential
comprised of all fields defined in ``self._cred_types``. Otherwise
``None`` is returned.
"""
if not type_hint or type_hint not in self._cred_types:
return

Expand All @@ -766,9 +800,12 @@ def _get_legacy_field_from_keyring(self, name, type_hint):
# legacy credentials used property names with underscores,
# but this is no longer syntax-compliant -- fix on read
cred[field.replace('_', '-')] = val
if 'type' not in cred:
cred['type'] = type_hint
return cred

if not cred:
# there is nothing on a legacy credential with this name
return None
else:
return cred

def _get_secret(self, name, type_hint=None):
secret = self._cfg.get(_get_cred_cfg_var(name, 'secret'))
Expand Down Expand Up @@ -875,7 +912,7 @@ def _cred_types(self):
return mapping


def _yield_legacy_credential_names():
def _yield_legacy_credential_types():
# query is constrained by non-secrets, no constraints means report all
# a constraint means *exact* match on all given properties
from datalad.downloaders.providers import (
Expand All @@ -884,9 +921,15 @@ def _yield_legacy_credential_names():
)
type_hints = {v: k for k, v in CREDENTIAL_TYPES.items()}

# ATTN: from Providers.from_config_files() is sensitive to the PWD
# it will only read legacy credentials from datasets whenever
# PWD is inside a dataset
legacy_credentials = set(
(p.credential.name, type(p.credential))
for p in Providers.from_config_files()
# without reload, no changes in files since the last call
# will be considered. That last call might have happended
# in datalad-core, and may have been in another directory
for p in Providers.from_config_files(reload=True)
if p.credential
)
for name, type_ in legacy_credentials:
Expand Down
102 changes: 96 additions & 6 deletions datalad_next/credman/tests/test_credman.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
with_testsui,
)
from datalad_next.datasets import Dataset
from datalad_next.utils import chpwd


def test_credmanager():
Expand All @@ -46,12 +47,6 @@ def check_credmanager():
# this makes it possible to discover credential fragments, if only to
# expose them for clean-up
eq_(credman.get(crazy='empty'), {'crazy': 'empty'})
# smoke test for legacy credential retrieval code
# reporting back a credential, even if empty, exposes the legacy
# credentials (by name), and enables discovery and (re)setting them
# using this newer credential system
eq_(credman.get('donotexiststest', type='user_password'),
{'type': 'user_password'})
# does not fiddle with a secret that is readily provided
eq_(credman.get('dummy', secret='mike', _type_hint='token'),
dict(type='token', secret='mike'))
Expand Down Expand Up @@ -285,3 +280,98 @@ def check_obtain():
assert res == (None,
{'type': 'token', 'secret': 'mynewtoken', '_edited': True,
'realm': 'mytotallynewrealm'})


legacy_provider_cfg = """\
[provider:credmanuniquetestcredentialsetup]
url_re = http://example\\.com/
authentication_type = http_basic_auth
credential = credmanuniquetestcredentialsetup
[credential:credmanuniquetestcredentialsetup]
type = user_password
"""


def test_legacy_credentials(tmp_path):
# we will use a dataset to host a legacy provider config
ds = Dataset(tmp_path).create(result_renderer='disabled')
provider_path = ds.pathobj / '.datalad' / 'providers' / 'mylegacycred.cfg'
provider_path.parent.mkdir(parents=True, exist_ok=True)
provider_path.write_text(legacy_provider_cfg)
# - the legacy code will only ever pick up a dataset credential, when
# PWD is inside a dataset
# - we want all tests to bypass the actual system keyring
# 'datalad.downloaders.credentials.keyring_' is what the UserPassword
# credential will use to store the credential
# - 'datalad.support.keyring_' is what credman uses
# - we need to make them one and the same thing
tmp_keyring = MemoryKeyring()
with chpwd(tmp_path), \
patch('datalad.downloaders.credentials.keyring_', tmp_keyring), \
patch('datalad.support.keyring_.keyring', tmp_keyring):
check_legacy_credentials(ds)


def check_legacy_credentials(ds):
# shortcut
cname = 'credmanuniquetestcredentialsetup'

credman = CredentialManager(ds.config)
# check that we get legacy reports in a query. this is needed to be
# able to even know that they exist
res = dict(credman.query())
assert cname in res
cred = res[cname]
# we always get the type reported
assert cred['type'] == 'user_password'
# we can know that it is a legacy credential
assert cred['_from_backend'] == 'legacy'
# but getting an unset legacy credential will unambiguously say
# "there is none"
assert credman.get(cname) is None

# we want all tests to bypass the actual system keyring
# 'datalad.downloaders.credentials.keyring_' is what the UserPassword
# credential will use to store the credential
from datalad.downloaders.credentials import UserPassword
lc = UserPassword(cname, dataset=ds)
lc.set(user='mike', password='pass123')
# now we should be able to get it from credman too
# and just by name -- no need to provide a type hint
cred = credman.get(cname)
assert cred['user'] == 'mike'
# reporting of the secret is always under the 'secret' key
assert cred['secret'] == 'pass123'
assert cred['type'] == 'user_password'
assert cred['_from_backend'] == 'legacy'

# check migration on set
try:
# setting a credential, will migrate info into the non-legacy
# backend. however, it will not move information _out of_
# the legacy backend, in order to keep old code working
# with the old info
# confirm starting point: legacy code keeps user in secret store
assert credman._keyring.entries[cname]['user'] == 'mike'
assert ds.config.get(f'datalad.credential.{cname}.user') is None
credman.set(cname, **cred)
# it remains there
assert credman._keyring.entries[cname]['user'] == 'mike'
# but is also migrated
assert ds.config.get(f'datalad.credential.{cname}.user') == 'mike'
# NOTE: This setup is not without problems. Users will update
# a credential and will leave an outdated (half) as garbage.
# however, I did not come up with a better approach that gradually
# brings users over.
credman.set(cname, user='newmike', secret='othersecret')
assert credman._keyring.entries[cname]['user'] == 'mike'
# finally check that the update is reported now
cred = credman.get(cname)
assert cred['user'] == 'newmike'
assert cred['secret'] == 'othersecret'
assert cred['type'] == 'user_password'
# no legacy info makes it out, hence no marker
assert cred.get('_from_backend') != 'legacy'
finally:
credman.remove(cname)

0 comments on commit 3e10738

Please sign in to comment.