47
47
from sigstore_protobuf_specs .dev .sigstore .common import v1 as common_v1
48
48
from sigstore_protobuf_specs .dev .sigstore .rekor import v1 as rekor_v1
49
49
from sigstore_protobuf_specs .dev .sigstore .rekor .v1 import (
50
- InclusionPromise ,
51
50
InclusionProof ,
52
51
)
53
52
@@ -187,6 +186,77 @@ def _from_response(cls, dict_: dict[str, Any]) -> LogEntry:
187
186
inclusion_promise = entry ["verification" ]["signedEntryTimestamp" ],
188
187
)
189
188
189
+ @classmethod
190
+ def _from_dict_rekor (cls , dict_ : dict [str , Any ]) -> LogEntry :
191
+ """
192
+ Create a new `LogEntry` from the given Rekor TransparencyLogEntry.
193
+ """
194
+ tlog_entry = rekor_v1 .TransparencyLogEntry ()
195
+ tlog_entry .from_dict (dict_ )
196
+
197
+ inclusion_proof : InclusionProof | None = tlog_entry .inclusion_proof
198
+ # This check is required by us as the client, not the
199
+ # protobuf-specs themselves.
200
+ if inclusion_proof is None or inclusion_proof .checkpoint .envelope is None :
201
+ raise InvalidBundle ("entry must contain inclusion proof" )
202
+
203
+ parsed_inclusion_proof = LogInclusionProof (
204
+ checkpoint = inclusion_proof .checkpoint .envelope ,
205
+ hashes = [h .hex () for h in inclusion_proof .hashes ],
206
+ log_index = inclusion_proof .log_index ,
207
+ root_hash = inclusion_proof .root_hash .hex (),
208
+ tree_size = inclusion_proof .tree_size ,
209
+ )
210
+
211
+ return LogEntry (
212
+ uuid = None ,
213
+ body = B64Str (base64 .b64encode (tlog_entry .canonicalized_body ).decode ()),
214
+ integrated_time = tlog_entry .integrated_time ,
215
+ log_id = tlog_entry .log_id .key_id .hex (),
216
+ log_index = tlog_entry .log_index ,
217
+ inclusion_proof = parsed_inclusion_proof ,
218
+ inclusion_promise = B64Str (
219
+ base64 .b64encode (
220
+ tlog_entry .inclusion_promise .signed_entry_timestamp
221
+ ).decode ()
222
+ ),
223
+ )
224
+
225
+ def _to_dict_rekor (self , is_message_signature : bool ) -> dict [str , Any ]:
226
+ inclusion_promise : rekor_v1 .InclusionPromise | None = None
227
+ if self .inclusion_promise :
228
+ inclusion_promise = rekor_v1 .InclusionPromise (
229
+ signed_entry_timestamp = base64 .b64decode (self .inclusion_promise )
230
+ )
231
+
232
+ inclusion_proof = rekor_v1 .InclusionProof (
233
+ log_index = self .inclusion_proof .log_index ,
234
+ root_hash = bytes .fromhex (self .inclusion_proof .root_hash ),
235
+ tree_size = self .inclusion_proof .tree_size ,
236
+ hashes = [bytes .fromhex (hash_ ) for hash_ in self .inclusion_proof .hashes ],
237
+ checkpoint = rekor_v1 .Checkpoint (envelope = self .inclusion_proof .checkpoint ),
238
+ )
239
+
240
+ tlog_entry = rekor_v1 .TransparencyLogEntry (
241
+ log_index = self .log_index ,
242
+ log_id = common_v1 .LogId (key_id = bytes .fromhex (self .log_id )),
243
+ integrated_time = self .integrated_time ,
244
+ inclusion_promise = inclusion_promise ,
245
+ inclusion_proof = inclusion_proof ,
246
+ canonicalized_body = base64 .b64decode (self .body ),
247
+ )
248
+
249
+ # Fill in the appropriate kind
250
+ if is_message_signature :
251
+ tlog_entry .kind_version = rekor_v1 .KindVersion (
252
+ kind = "hashedrekord" , version = "0.0.1"
253
+ )
254
+ else :
255
+ tlog_entry .kind_version = rekor_v1 .KindVersion (kind = "dsse" , version = "0.0.1" )
256
+
257
+ tlog_entry_dict : dict [str , Any ] = tlog_entry .to_dict ()
258
+ return tlog_entry_dict
259
+
190
260
def encode_canonical (self ) -> bytes :
191
261
"""
192
262
Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
@@ -366,55 +436,22 @@ def _verify_bundle(self) -> None:
366
436
# The inclusion promise is NOT required; if present, the client
367
437
# SHOULD verify it.
368
438
#
369
- # Beneath all of this, we require that the inclusion proof be present.
370
- inclusion_promise : InclusionPromise | None = tlog_entry .inclusion_promise
371
- inclusion_proof : InclusionProof | None = tlog_entry .inclusion_proof
439
+ # Before all of this, we require that the inclusion proof be present
440
+ # (when constructing the LogEntry).
441
+ log_entry = LogEntry ._from_dict_rekor (tlog_entry .to_dict ())
442
+
372
443
if media_type == BundleType .BUNDLE_0_1 :
373
- if not inclusion_promise :
444
+ if not log_entry . inclusion_promise :
374
445
raise InvalidBundle ("bundle must contain an inclusion promise" )
375
- if inclusion_proof and not inclusion_proof .checkpoint . envelope :
446
+ if not log_entry . inclusion_proof .checkpoint :
376
447
_logger .debug (
377
448
"0.1 bundle contains inclusion proof without checkpoint; ignoring"
378
449
)
379
450
else :
380
- if not inclusion_proof :
381
- raise InvalidBundle ("bundle must contain an inclusion proof" )
382
- if not inclusion_proof .checkpoint .envelope :
451
+ if not log_entry .inclusion_proof .checkpoint :
383
452
raise InvalidBundle ("expected checkpoint in inclusion proof" )
384
453
385
- parsed_inclusion_proof : InclusionProof | None = None
386
- if (
387
- inclusion_proof is not None
388
- and inclusion_proof .checkpoint .envelope is not None
389
- ):
390
- parsed_inclusion_proof = LogInclusionProof (
391
- checkpoint = inclusion_proof .checkpoint .envelope ,
392
- hashes = [h .hex () for h in inclusion_proof .hashes ],
393
- log_index = inclusion_proof .log_index ,
394
- root_hash = inclusion_proof .root_hash .hex (),
395
- tree_size = inclusion_proof .tree_size ,
396
- )
397
-
398
- # Sanity: the only way we can hit this is with a v1 bundle without
399
- # an inclusion proof. Putting this check here rather than above makes
400
- # it clear that this check is required by us as the client, not the
401
- # protobuf-specs themselves.
402
- if parsed_inclusion_proof is None :
403
- raise InvalidBundle ("bundle must contain inclusion proof" )
404
-
405
- self ._log_entry = LogEntry (
406
- uuid = None ,
407
- body = B64Str (base64 .b64encode (tlog_entry .canonicalized_body ).decode ()),
408
- integrated_time = tlog_entry .integrated_time ,
409
- log_id = tlog_entry .log_id .key_id .hex (),
410
- log_index = tlog_entry .log_index ,
411
- inclusion_proof = parsed_inclusion_proof ,
412
- inclusion_promise = B64Str (
413
- base64 .b64encode (
414
- tlog_entry .inclusion_promise .signed_entry_timestamp
415
- ).decode ()
416
- ),
417
- )
454
+ self ._log_entry = log_entry
418
455
419
456
@property
420
457
def signing_certificate (self ) -> Certificate :
@@ -476,30 +513,6 @@ def _from_parts(
476
513
"""
477
514
@private
478
515
"""
479
- inclusion_promise : rekor_v1 .InclusionPromise | None = None
480
- if log_entry .inclusion_promise :
481
- inclusion_promise = rekor_v1 .InclusionPromise (
482
- signed_entry_timestamp = base64 .b64decode (log_entry .inclusion_promise )
483
- )
484
-
485
- inclusion_proof = rekor_v1 .InclusionProof (
486
- log_index = log_entry .inclusion_proof .log_index ,
487
- root_hash = bytes .fromhex (log_entry .inclusion_proof .root_hash ),
488
- tree_size = log_entry .inclusion_proof .tree_size ,
489
- hashes = [bytes .fromhex (hash_ ) for hash_ in log_entry .inclusion_proof .hashes ],
490
- checkpoint = rekor_v1 .Checkpoint (
491
- envelope = log_entry .inclusion_proof .checkpoint
492
- ),
493
- )
494
-
495
- tlog_entry = rekor_v1 .TransparencyLogEntry (
496
- log_index = log_entry .log_index ,
497
- log_id = common_v1 .LogId (key_id = bytes .fromhex (log_entry .log_id )),
498
- integrated_time = log_entry .integrated_time ,
499
- inclusion_promise = inclusion_promise ,
500
- inclusion_proof = inclusion_proof ,
501
- canonicalized_body = base64 .b64decode (log_entry .body ),
502
- )
503
516
504
517
inner = _Bundle (
505
518
media_type = BundleType .BUNDLE_0_3 .value ,
@@ -508,16 +521,15 @@ def _from_parts(
508
521
),
509
522
)
510
523
524
+ is_message_signature = isinstance (content , common_v1 .MessageSignature )
511
525
# Fill in the appropriate variants.
512
- if isinstance ( content , common_v1 . MessageSignature ) :
526
+ if is_message_signature :
513
527
inner .message_signature = content
514
- tlog_entry .kind_version = rekor_v1 .KindVersion (
515
- kind = "hashedrekord" , version = "0.0.1"
516
- )
517
528
else :
518
529
inner .dsse_envelope = content ._inner
519
- tlog_entry .kind_version = rekor_v1 .KindVersion (kind = "dsse" , version = "0.0.1" )
520
530
531
+ tlog_entry = rekor_v1 .TransparencyLogEntry ()
532
+ tlog_entry .from_dict (log_entry ._to_dict_rekor (is_message_signature ))
521
533
inner .verification_material .tlog_entries = [tlog_entry ]
522
534
523
535
return cls (inner )
0 commit comments