18
18
19
19
from __future__ import annotations
20
20
21
+ from collections import defaultdict
21
22
from collections .abc import Iterable
22
23
from dataclasses import dataclass
23
24
from datetime import datetime , timezone
56
57
TrustedRoot as _TrustedRoot ,
57
58
)
58
59
60
+ from sigstore ._internal .fulcio .client import FulcioClient
61
+ from sigstore ._internal .rekor .client import RekorClient
62
+ from sigstore ._internal .timestamp import TimestampAuthorityClient
59
63
from sigstore ._internal .tuf import DEFAULT_TUF_URL , STAGING_TUF_URL , TrustUpdater
60
64
from sigstore ._utils import (
61
65
KeyID ,
66
70
)
67
71
from sigstore .errors import Error , MetadataError , TUFError , VerificationError
68
72
73
+ # Versions supported by this client
74
+ REKOR_VERSIONS = [1 ]
75
+ TSA_VERSIONS = [1 ]
76
+ FULCIO_VERSIONS = [1 ]
77
+ OIDC_VERSIONS = [1 ]
78
+
69
79
70
80
def _is_timerange_valid (period : TimeRange | None , * , allow_expired : bool ) -> bool :
71
81
"""
@@ -323,13 +333,6 @@ def __init__(self, inner: _SigningConfig):
323
333
@api private
324
334
"""
325
335
self ._inner = inner
326
- self ._verify ()
327
-
328
- def _verify (self ) -> None :
329
- """
330
- Performs various feats of heroism to ensure that the signing config
331
- is well-formed.
332
- """
333
336
334
337
# must have a recognized media type.
335
338
try :
@@ -346,6 +349,14 @@ def _verify(self) -> None:
346
349
if self ._inner .tsa_config .selector != ServiceSelector .ANY :
347
350
raise Error (f"unsupported TSA selector { self ._inner .tsa_config .selector } " )
348
351
352
+ # Create lists of service protos that are valid & supported by this client
353
+ self ._tlogs = self ._get_valid_services (
354
+ self ._inner .rekor_tlog_urls , REKOR_VERSIONS
355
+ )
356
+ self ._tsas = self ._get_valid_services (self ._inner .tsa_urls , TSA_VERSIONS )
357
+ self ._fulcios = self ._get_valid_services (self ._inner .ca_urls , FULCIO_VERSIONS )
358
+ self ._oidcs = self ._get_valid_services (self ._inner .oidc_urls , OIDC_VERSIONS )
359
+
349
360
@classmethod
350
361
def from_file (
351
362
cls ,
@@ -355,55 +366,67 @@ def from_file(
355
366
inner = _SigningConfig ().from_json (Path (path ).read_bytes ())
356
367
return cls (inner )
357
368
358
- @staticmethod
359
- def _get_valid_service_url (services : list [Service ]) -> str | None :
369
+ def _get_valid_services (
370
+ self , services : list [Service ], valid_versions : list [int ]
371
+ ) -> list [Service ]:
372
+ """Return supported services, taking SigningConfig restrictions into account"""
373
+
374
+ # split logs by operator, only include valid services
375
+ logs_by_operator : dict [str , list [Service ]] = defaultdict (list )
360
376
for service in services :
361
- if service .major_api_version != 1 :
377
+ if service .major_api_version not in valid_versions :
362
378
continue
363
379
364
380
if not _is_timerange_valid (service .valid_for , allow_expired = False ):
365
381
continue
366
- return service .url
367
- return None
368
382
369
- def get_tlog_urls (self ) -> list [str ]:
383
+ logs_by_operator [service .operator ].append (service )
384
+
385
+ # return a list of services but make sure we only return logs of one version per operator
386
+ result : list [Service ] = []
387
+ for logs in logs_by_operator .values ():
388
+ logs .sort (key = lambda s : s .major_api_version )
389
+ max_version = logs [- 1 ].major_api_version
390
+
391
+ while logs and logs [- 1 ].major_api_version == max_version :
392
+ result .append (logs .pop ())
393
+
394
+ return result
395
+
396
+ def get_tlogs (self ) -> list [RekorClient ]:
370
397
"""
371
398
Returns the rekor transparency logs that client should sign with.
372
- Currently only returns a single one but could in future return several
373
399
"""
374
400
375
- url = self ._get_valid_service_url (self ._inner .rekor_tlog_urls )
376
- if not url :
401
+ if not self ._tlogs :
377
402
raise Error ("No valid Rekor transparency log found in signing config" )
378
- return [url ]
403
+ return [RekorClient ( tlog . url ) for tlog in self . _tlogs ]
379
404
380
- def get_fulcio_url (self ) -> str :
405
+ def get_fulcio (self ) -> FulcioClient :
381
406
"""
382
407
Returns url for the fulcio instance that client should use to get a
383
408
signing certificate from
384
409
"""
385
- url = self ._get_valid_service_url (self ._inner .ca_urls )
386
- if not url :
410
+ if not self ._fulcios :
387
411
raise Error ("No valid Fulcio CA found in signing config" )
388
- return url
412
+ return FulcioClient ( self . _fulcios [ 0 ]. url )
389
413
390
414
def get_oidc_url (self ) -> str :
391
415
"""
392
416
Returns url for the OIDC provider that client should use to interactively
393
417
authenticate.
394
418
"""
395
- url = self ._get_valid_service_url (self ._inner .oidc_urls )
396
- if not url :
419
+ if not self ._oidcs :
397
420
raise Error ("No valid OIDC provider found in signing config" )
398
- return url
421
+ return self . _oidcs [ 0 ]. url
399
422
400
- def get_tsa_urls (self ) -> list [str ]:
423
+ def get_tsas (self ) -> list [TimestampAuthorityClient ]:
401
424
"""
402
- Returns timestamp authority API end points. Currently returns a single one
403
- but may return more in future.
425
+ Returns timestamp authority API end points.
404
426
"""
405
- url = self ._get_valid_service_url (self ._inner .tsa_urls )
406
- return [] if url is None else [url ]
427
+ if not self ._tsas :
428
+ raise Error ("No valid Rekor transparency log found in signing config" )
429
+ return [TimestampAuthorityClient (s .url ) for s in self ._tsas ]
407
430
408
431
409
432
class TrustedRoot :
0 commit comments