36
36
Field ,
37
37
StrictInt ,
38
38
StrictStr ,
39
+ TypeAdapter ,
39
40
ValidationInfo ,
40
41
field_validator ,
41
42
)
42
43
from pydantic .dataclasses import dataclass
44
+ from rekor_types import Dsse , Hashedrekord , ProposedEntry
43
45
from sigstore_protobuf_specs .dev .sigstore .bundle import v1 as bundle_v1
44
46
from sigstore_protobuf_specs .dev .sigstore .bundle .v1 import (
45
47
Bundle as _Bundle ,
46
48
)
47
49
from sigstore_protobuf_specs .dev .sigstore .common import v1 as common_v1
48
50
from sigstore_protobuf_specs .dev .sigstore .rekor import v1 as rekor_v1
49
51
from sigstore_protobuf_specs .dev .sigstore .rekor .v1 import (
50
- InclusionPromise ,
51
52
InclusionProof ,
52
53
)
53
54
@@ -187,6 +188,80 @@ def _from_response(cls, dict_: dict[str, Any]) -> LogEntry:
187
188
inclusion_promise = entry ["verification" ]["signedEntryTimestamp" ],
188
189
)
189
190
191
+ @classmethod
192
+ def _from_dict_rekor (cls , dict_ : dict [str , Any ]) -> LogEntry :
193
+ """
194
+ Create a new `LogEntry` from the given Rekor TransparencyLogEntry.
195
+ """
196
+ tlog_entry = rekor_v1 .TransparencyLogEntry ()
197
+ tlog_entry .from_dict (dict_ )
198
+
199
+ inclusion_proof : InclusionProof | None = tlog_entry .inclusion_proof
200
+ # This check is required by us as the client, not the
201
+ # protobuf-specs themselves.
202
+ if inclusion_proof is None or inclusion_proof .checkpoint .envelope is None :
203
+ raise InvalidBundle ("entry must contain inclusion proof" )
204
+
205
+ parsed_inclusion_proof = LogInclusionProof (
206
+ checkpoint = inclusion_proof .checkpoint .envelope ,
207
+ hashes = [h .hex () for h in inclusion_proof .hashes ],
208
+ log_index = inclusion_proof .log_index ,
209
+ root_hash = inclusion_proof .root_hash .hex (),
210
+ tree_size = inclusion_proof .tree_size ,
211
+ )
212
+
213
+ return LogEntry (
214
+ uuid = None ,
215
+ body = B64Str (base64 .b64encode (tlog_entry .canonicalized_body ).decode ()),
216
+ integrated_time = tlog_entry .integrated_time ,
217
+ log_id = tlog_entry .log_id .key_id .hex (),
218
+ log_index = tlog_entry .log_index ,
219
+ inclusion_proof = parsed_inclusion_proof ,
220
+ inclusion_promise = B64Str (
221
+ base64 .b64encode (
222
+ tlog_entry .inclusion_promise .signed_entry_timestamp
223
+ ).decode ()
224
+ ),
225
+ )
226
+
227
+ def _to_dict_rekor (self ) -> dict [str , Any ]:
228
+ inclusion_promise : rekor_v1 .InclusionPromise | None = None
229
+ if self .inclusion_promise :
230
+ inclusion_promise = rekor_v1 .InclusionPromise (
231
+ signed_entry_timestamp = base64 .b64decode (self .inclusion_promise )
232
+ )
233
+
234
+ inclusion_proof = rekor_v1 .InclusionProof (
235
+ log_index = self .inclusion_proof .log_index ,
236
+ root_hash = bytes .fromhex (self .inclusion_proof .root_hash ),
237
+ tree_size = self .inclusion_proof .tree_size ,
238
+ hashes = [bytes .fromhex (hash_ ) for hash_ in self .inclusion_proof .hashes ],
239
+ checkpoint = rekor_v1 .Checkpoint (envelope = self .inclusion_proof .checkpoint ),
240
+ )
241
+
242
+ tlog_entry = rekor_v1 .TransparencyLogEntry (
243
+ log_index = self .log_index ,
244
+ log_id = common_v1 .LogId (key_id = bytes .fromhex (self .log_id )),
245
+ integrated_time = self .integrated_time ,
246
+ inclusion_promise = inclusion_promise ,
247
+ inclusion_proof = inclusion_proof ,
248
+ canonicalized_body = base64 .b64decode (self .body ),
249
+ )
250
+
251
+ # Fill in the appropriate kind
252
+ body_entry = TypeAdapter (ProposedEntry ).validate_json (
253
+ tlog_entry .canonicalized_body
254
+ )
255
+ if not isinstance (body_entry , (Hashedrekord , Dsse )):
256
+ raise InvalidBundle ("log entry is not of expected type" )
257
+
258
+ tlog_entry .kind_version = rekor_v1 .KindVersion (
259
+ kind = body_entry .kind , version = body_entry .api_version
260
+ )
261
+
262
+ tlog_entry_dict : dict [str , Any ] = tlog_entry .to_dict ()
263
+ return tlog_entry_dict
264
+
190
265
def encode_canonical (self ) -> bytes :
191
266
"""
192
267
Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
@@ -366,55 +441,22 @@ def _verify_bundle(self) -> None:
366
441
# The inclusion promise is NOT required; if present, the client
367
442
# SHOULD verify it.
368
443
#
369
- # Beneath all of this, we require that the inclusion proof be present.
370
- inclusion_promise : InclusionPromise | None = tlog_entry .inclusion_promise
371
- inclusion_proof : InclusionProof | None = tlog_entry .inclusion_proof
444
+ # Before all of this, we require that the inclusion proof be present
445
+ # (when constructing the LogEntry).
446
+ log_entry = LogEntry ._from_dict_rekor (tlog_entry .to_dict ())
447
+
372
448
if media_type == BundleType .BUNDLE_0_1 :
373
- if not inclusion_promise :
449
+ if not log_entry . inclusion_promise :
374
450
raise InvalidBundle ("bundle must contain an inclusion promise" )
375
- if inclusion_proof and not inclusion_proof .checkpoint . envelope :
451
+ if not log_entry . inclusion_proof .checkpoint :
376
452
_logger .debug (
377
453
"0.1 bundle contains inclusion proof without checkpoint; ignoring"
378
454
)
379
455
else :
380
- if not inclusion_proof :
381
- raise InvalidBundle ("bundle must contain an inclusion proof" )
382
- if not inclusion_proof .checkpoint .envelope :
456
+ if not log_entry .inclusion_proof .checkpoint :
383
457
raise InvalidBundle ("expected checkpoint in inclusion proof" )
384
458
385
- parsed_inclusion_proof : InclusionProof | None = None
386
- if (
387
- inclusion_proof is not None
388
- and inclusion_proof .checkpoint .envelope is not None
389
- ):
390
- parsed_inclusion_proof = LogInclusionProof (
391
- checkpoint = inclusion_proof .checkpoint .envelope ,
392
- hashes = [h .hex () for h in inclusion_proof .hashes ],
393
- log_index = inclusion_proof .log_index ,
394
- root_hash = inclusion_proof .root_hash .hex (),
395
- tree_size = inclusion_proof .tree_size ,
396
- )
397
-
398
- # Sanity: the only way we can hit this is with a v1 bundle without
399
- # an inclusion proof. Putting this check here rather than above makes
400
- # it clear that this check is required by us as the client, not the
401
- # protobuf-specs themselves.
402
- if parsed_inclusion_proof is None :
403
- raise InvalidBundle ("bundle must contain inclusion proof" )
404
-
405
- self ._log_entry = LogEntry (
406
- uuid = None ,
407
- body = B64Str (base64 .b64encode (tlog_entry .canonicalized_body ).decode ()),
408
- integrated_time = tlog_entry .integrated_time ,
409
- log_id = tlog_entry .log_id .key_id .hex (),
410
- log_index = tlog_entry .log_index ,
411
- inclusion_proof = parsed_inclusion_proof ,
412
- inclusion_promise = B64Str (
413
- base64 .b64encode (
414
- tlog_entry .inclusion_promise .signed_entry_timestamp
415
- ).decode ()
416
- ),
417
- )
459
+ self ._log_entry = log_entry
418
460
419
461
@property
420
462
def signing_certificate (self ) -> Certificate :
@@ -476,30 +518,6 @@ def _from_parts(
476
518
"""
477
519
@private
478
520
"""
479
- inclusion_promise : rekor_v1 .InclusionPromise | None = None
480
- if log_entry .inclusion_promise :
481
- inclusion_promise = rekor_v1 .InclusionPromise (
482
- signed_entry_timestamp = base64 .b64decode (log_entry .inclusion_promise )
483
- )
484
-
485
- inclusion_proof = rekor_v1 .InclusionProof (
486
- log_index = log_entry .inclusion_proof .log_index ,
487
- root_hash = bytes .fromhex (log_entry .inclusion_proof .root_hash ),
488
- tree_size = log_entry .inclusion_proof .tree_size ,
489
- hashes = [bytes .fromhex (hash_ ) for hash_ in log_entry .inclusion_proof .hashes ],
490
- checkpoint = rekor_v1 .Checkpoint (
491
- envelope = log_entry .inclusion_proof .checkpoint
492
- ),
493
- )
494
-
495
- tlog_entry = rekor_v1 .TransparencyLogEntry (
496
- log_index = log_entry .log_index ,
497
- log_id = common_v1 .LogId (key_id = bytes .fromhex (log_entry .log_id )),
498
- integrated_time = log_entry .integrated_time ,
499
- inclusion_promise = inclusion_promise ,
500
- inclusion_proof = inclusion_proof ,
501
- canonicalized_body = base64 .b64decode (log_entry .body ),
502
- )
503
521
504
522
inner = _Bundle (
505
523
media_type = BundleType .BUNDLE_0_3 .value ,
@@ -511,13 +529,11 @@ def _from_parts(
511
529
# Fill in the appropriate variants.
512
530
if isinstance (content , common_v1 .MessageSignature ):
513
531
inner .message_signature = content
514
- tlog_entry .kind_version = rekor_v1 .KindVersion (
515
- kind = "hashedrekord" , version = "0.0.1"
516
- )
517
532
else :
518
533
inner .dsse_envelope = content ._inner
519
- tlog_entry .kind_version = rekor_v1 .KindVersion (kind = "dsse" , version = "0.0.1" )
520
534
535
+ tlog_entry = rekor_v1 .TransparencyLogEntry ()
536
+ tlog_entry .from_dict (log_entry ._to_dict_rekor ())
521
537
inner .verification_material .tlog_entries = [tlog_entry ]
522
538
523
539
return cls (inner )
0 commit comments