From 823089ab80e12a955c26552131b5ef478cd4da50 Mon Sep 17 00:00:00 2001 From: Facundo Tuesca Date: Mon, 29 Apr 2024 23:18:07 +0200 Subject: [PATCH 1/5] sigstore: extract LogEntry conversions to their own functions Signed-off-by: Facundo Tuesca --- sigstore/models.py | 154 ++++++++++++++++++++++++--------------------- 1 file changed, 83 insertions(+), 71 deletions(-) diff --git a/sigstore/models.py b/sigstore/models.py index 985bee168..cc0cf5c02 100644 --- a/sigstore/models.py +++ b/sigstore/models.py @@ -47,7 +47,6 @@ from sigstore_protobuf_specs.dev.sigstore.common import v1 as common_v1 from sigstore_protobuf_specs.dev.sigstore.rekor import v1 as rekor_v1 from sigstore_protobuf_specs.dev.sigstore.rekor.v1 import ( - InclusionPromise, InclusionProof, ) @@ -187,6 +186,77 @@ def _from_response(cls, dict_: dict[str, Any]) -> LogEntry: inclusion_promise=entry["verification"]["signedEntryTimestamp"], ) + @classmethod + def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry: + """ + Create a new `LogEntry` from the given Rekor TransparencyLogEntry. + """ + tlog_entry = rekor_v1.TransparencyLogEntry() + tlog_entry.from_dict(dict_) + + inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof + # This check is required by us as the client, not the + # protobuf-specs themselves. + if inclusion_proof is None or inclusion_proof.checkpoint.envelope is None: + raise InvalidBundle("entry must contain inclusion proof") + + parsed_inclusion_proof = LogInclusionProof( + checkpoint=inclusion_proof.checkpoint.envelope, + hashes=[h.hex() for h in inclusion_proof.hashes], + log_index=inclusion_proof.log_index, + root_hash=inclusion_proof.root_hash.hex(), + tree_size=inclusion_proof.tree_size, + ) + + return LogEntry( + uuid=None, + body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()), + integrated_time=tlog_entry.integrated_time, + log_id=tlog_entry.log_id.key_id.hex(), + log_index=tlog_entry.log_index, + inclusion_proof=parsed_inclusion_proof, + inclusion_promise=B64Str( + base64.b64encode( + tlog_entry.inclusion_promise.signed_entry_timestamp + ).decode() + ), + ) + + def _to_dict_rekor(self, is_message_signature: bool) -> dict[str, Any]: + inclusion_promise: rekor_v1.InclusionPromise | None = None + if self.inclusion_promise: + inclusion_promise = rekor_v1.InclusionPromise( + signed_entry_timestamp=base64.b64decode(self.inclusion_promise) + ) + + inclusion_proof = rekor_v1.InclusionProof( + log_index=self.inclusion_proof.log_index, + root_hash=bytes.fromhex(self.inclusion_proof.root_hash), + tree_size=self.inclusion_proof.tree_size, + hashes=[bytes.fromhex(hash_) for hash_ in self.inclusion_proof.hashes], + checkpoint=rekor_v1.Checkpoint(envelope=self.inclusion_proof.checkpoint), + ) + + tlog_entry = rekor_v1.TransparencyLogEntry( + log_index=self.log_index, + log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)), + integrated_time=self.integrated_time, + inclusion_promise=inclusion_promise, + inclusion_proof=inclusion_proof, + canonicalized_body=base64.b64decode(self.body), + ) + + # Fill in the appropriate kind + if is_message_signature: + tlog_entry.kind_version = rekor_v1.KindVersion( + kind="hashedrekord", version="0.0.1" + ) + else: + tlog_entry.kind_version = rekor_v1.KindVersion(kind="dsse", version="0.0.1") + + tlog_entry_dict: dict[str, Any] = tlog_entry.to_dict() + return tlog_entry_dict + def encode_canonical(self) -> bytes: """ Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry. @@ -366,55 +436,22 @@ def _verify_bundle(self) -> None: # The inclusion promise is NOT required; if present, the client # SHOULD verify it. # - # Beneath all of this, we require that the inclusion proof be present. - inclusion_promise: InclusionPromise | None = tlog_entry.inclusion_promise - inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof + # Before all of this, we require that the inclusion proof be present + # (when constructing the LogEntry). + log_entry = LogEntry._from_dict_rekor(tlog_entry.to_dict()) + if media_type == BundleType.BUNDLE_0_1: - if not inclusion_promise: + if not log_entry.inclusion_promise: raise InvalidBundle("bundle must contain an inclusion promise") - if inclusion_proof and not inclusion_proof.checkpoint.envelope: + if not log_entry.inclusion_proof.checkpoint: _logger.debug( "0.1 bundle contains inclusion proof without checkpoint; ignoring" ) else: - if not inclusion_proof: - raise InvalidBundle("bundle must contain an inclusion proof") - if not inclusion_proof.checkpoint.envelope: + if not log_entry.inclusion_proof.checkpoint: raise InvalidBundle("expected checkpoint in inclusion proof") - parsed_inclusion_proof: InclusionProof | None = None - if ( - inclusion_proof is not None - and inclusion_proof.checkpoint.envelope is not None - ): - parsed_inclusion_proof = LogInclusionProof( - checkpoint=inclusion_proof.checkpoint.envelope, - hashes=[h.hex() for h in inclusion_proof.hashes], - log_index=inclusion_proof.log_index, - root_hash=inclusion_proof.root_hash.hex(), - tree_size=inclusion_proof.tree_size, - ) - - # Sanity: the only way we can hit this is with a v1 bundle without - # an inclusion proof. Putting this check here rather than above makes - # it clear that this check is required by us as the client, not the - # protobuf-specs themselves. - if parsed_inclusion_proof is None: - raise InvalidBundle("bundle must contain inclusion proof") - - self._log_entry = LogEntry( - uuid=None, - body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()), - integrated_time=tlog_entry.integrated_time, - log_id=tlog_entry.log_id.key_id.hex(), - log_index=tlog_entry.log_index, - inclusion_proof=parsed_inclusion_proof, - inclusion_promise=B64Str( - base64.b64encode( - tlog_entry.inclusion_promise.signed_entry_timestamp - ).decode() - ), - ) + self._log_entry = log_entry @property def signing_certificate(self) -> Certificate: @@ -476,30 +513,6 @@ def _from_parts( """ @private """ - inclusion_promise: rekor_v1.InclusionPromise | None = None - if log_entry.inclusion_promise: - inclusion_promise = rekor_v1.InclusionPromise( - signed_entry_timestamp=base64.b64decode(log_entry.inclusion_promise) - ) - - inclusion_proof = rekor_v1.InclusionProof( - log_index=log_entry.inclusion_proof.log_index, - root_hash=bytes.fromhex(log_entry.inclusion_proof.root_hash), - tree_size=log_entry.inclusion_proof.tree_size, - hashes=[bytes.fromhex(hash_) for hash_ in log_entry.inclusion_proof.hashes], - checkpoint=rekor_v1.Checkpoint( - envelope=log_entry.inclusion_proof.checkpoint - ), - ) - - tlog_entry = rekor_v1.TransparencyLogEntry( - log_index=log_entry.log_index, - log_id=common_v1.LogId(key_id=bytes.fromhex(log_entry.log_id)), - integrated_time=log_entry.integrated_time, - inclusion_promise=inclusion_promise, - inclusion_proof=inclusion_proof, - canonicalized_body=base64.b64decode(log_entry.body), - ) inner = _Bundle( media_type=BundleType.BUNDLE_0_3.value, @@ -508,16 +521,15 @@ def _from_parts( ), ) + is_message_signature = isinstance(content, common_v1.MessageSignature) # Fill in the appropriate variants. - if isinstance(content, common_v1.MessageSignature): + if is_message_signature: inner.message_signature = content - tlog_entry.kind_version = rekor_v1.KindVersion( - kind="hashedrekord", version="0.0.1" - ) else: inner.dsse_envelope = content._inner - tlog_entry.kind_version = rekor_v1.KindVersion(kind="dsse", version="0.0.1") + tlog_entry = rekor_v1.TransparencyLogEntry() + tlog_entry.from_dict(log_entry._to_dict_rekor(is_message_signature)) inner.verification_material.tlog_entries = [tlog_entry] return cls(inner) From 8a72ac425457d21bfe22bc67ad9facd3b461fae3 Mon Sep 17 00:00:00 2001 From: Facundo Tuesca Date: Mon, 29 Apr 2024 23:56:44 +0200 Subject: [PATCH 2/5] test: add test for roundtrip LogEntry conversion Signed-off-by: Facundo Tuesca --- test/unit/test_models.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/test/unit/test_models.py b/test/unit/test_models.py index 758738489..0ea4b7c16 100644 --- a/test/unit/test_models.py +++ b/test/unit/test_models.py @@ -34,6 +34,16 @@ def test_missing_inclusion_proof(self): inclusion_promise=None, ) + def test_logentry_roundtrip(self, signing_bundle): + _, bundle = signing_bundle("bundle.txt") + + assert ( + LogEntry._from_dict_rekor( + bundle.log_entry._to_dict_rekor(is_message_signature=True) + ) + == bundle.log_entry + ) + class TestLogInclusionProof: def test_valid(self): From 1f128c19b2c3157cdbfd30c9ca86efc35a32aeac Mon Sep 17 00:00:00 2001 From: Facundo Tuesca Date: Tue, 30 Apr 2024 16:35:28 +0200 Subject: [PATCH 3/5] sigstore,test: deduce LogEntry type from its body Signed-off-by: Facundo Tuesca --- sigstore/models.py | 26 ++++++++++++++++---------- test/unit/test_models.py | 4 +--- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/sigstore/models.py b/sigstore/models.py index cc0cf5c02..8a6697c04 100644 --- a/sigstore/models.py +++ b/sigstore/models.py @@ -36,10 +36,12 @@ Field, StrictInt, StrictStr, + TypeAdapter, ValidationInfo, field_validator, ) from pydantic.dataclasses import dataclass +from rekor_types import Dsse, Hashedrekord, ProposedEntry from sigstore_protobuf_specs.dev.sigstore.bundle import v1 as bundle_v1 from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import ( Bundle as _Bundle, @@ -222,7 +224,7 @@ def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry: ), ) - def _to_dict_rekor(self, is_message_signature: bool) -> dict[str, Any]: + def _to_dict_rekor(self) -> dict[str, Any]: inclusion_promise: rekor_v1.InclusionPromise | None = None if self.inclusion_promise: inclusion_promise = rekor_v1.InclusionPromise( @@ -247,12 +249,17 @@ def _to_dict_rekor(self, is_message_signature: bool) -> dict[str, Any]: ) # Fill in the appropriate kind - if is_message_signature: - tlog_entry.kind_version = rekor_v1.KindVersion( - kind="hashedrekord", version="0.0.1" - ) - else: - tlog_entry.kind_version = rekor_v1.KindVersion(kind="dsse", version="0.0.1") + body_entry = TypeAdapter(ProposedEntry).validate_json( + tlog_entry.canonicalized_body + ) + if not isinstance(body_entry, Hashedrekord) and not isinstance( + body_entry, Dsse + ): + raise ValueError("LogEntry is not of expected type") + + tlog_entry.kind_version = rekor_v1.KindVersion( + kind=body_entry.kind, version=body_entry.api_version + ) tlog_entry_dict: dict[str, Any] = tlog_entry.to_dict() return tlog_entry_dict @@ -521,15 +528,14 @@ def _from_parts( ), ) - is_message_signature = isinstance(content, common_v1.MessageSignature) # Fill in the appropriate variants. - if is_message_signature: + if isinstance(content, common_v1.MessageSignature): inner.message_signature = content else: inner.dsse_envelope = content._inner tlog_entry = rekor_v1.TransparencyLogEntry() - tlog_entry.from_dict(log_entry._to_dict_rekor(is_message_signature)) + tlog_entry.from_dict(log_entry._to_dict_rekor()) inner.verification_material.tlog_entries = [tlog_entry] return cls(inner) diff --git a/test/unit/test_models.py b/test/unit/test_models.py index 0ea4b7c16..93b0d0f12 100644 --- a/test/unit/test_models.py +++ b/test/unit/test_models.py @@ -38,9 +38,7 @@ def test_logentry_roundtrip(self, signing_bundle): _, bundle = signing_bundle("bundle.txt") assert ( - LogEntry._from_dict_rekor( - bundle.log_entry._to_dict_rekor(is_message_signature=True) - ) + LogEntry._from_dict_rekor(bundle.log_entry._to_dict_rekor()) == bundle.log_entry ) From 4436f5ca84156ef8aa44643a271ea54379462fc6 Mon Sep 17 00:00:00 2001 From: Facundo Tuesca Date: Tue, 30 Apr 2024 16:49:10 +0200 Subject: [PATCH 4/5] Update sigstore/models.py Co-authored-by: William Woodruff Signed-off-by: Facundo Tuesca --- sigstore/models.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sigstore/models.py b/sigstore/models.py index 8a6697c04..61a13f19a 100644 --- a/sigstore/models.py +++ b/sigstore/models.py @@ -252,9 +252,7 @@ def _to_dict_rekor(self) -> dict[str, Any]: body_entry = TypeAdapter(ProposedEntry).validate_json( tlog_entry.canonicalized_body ) - if not isinstance(body_entry, Hashedrekord) and not isinstance( - body_entry, Dsse - ): + if not isinstance(body_entry, (Hashedrekord, Dsse)): raise ValueError("LogEntry is not of expected type") tlog_entry.kind_version = rekor_v1.KindVersion( From 93491e9c906720932977abd3870fca69b3f3a106 Mon Sep 17 00:00:00 2001 From: Facundo Tuesca Date: Tue, 30 Apr 2024 16:56:11 +0200 Subject: [PATCH 5/5] Update sigstore/models.py Co-authored-by: William Woodruff Signed-off-by: Facundo Tuesca --- sigstore/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sigstore/models.py b/sigstore/models.py index 61a13f19a..cb7f60257 100644 --- a/sigstore/models.py +++ b/sigstore/models.py @@ -253,7 +253,7 @@ def _to_dict_rekor(self) -> dict[str, Any]: tlog_entry.canonicalized_body ) if not isinstance(body_entry, (Hashedrekord, Dsse)): - raise ValueError("LogEntry is not of expected type") + raise InvalidBundle("log entry is not of expected type") tlog_entry.kind_version = rekor_v1.KindVersion( kind=body_entry.kind, version=body_entry.api_version